diff --git a/swh/web/ui/api.py b/swh/web/ui/api.py index 08c27c69..6b459ca2 100644 --- a/swh/web/ui/api.py +++ b/swh/web/ui/api.py @@ -1,870 +1,878 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from types import GeneratorType from flask import request, url_for, Response, redirect from swh.web.ui import service, utils from swh.web.ui.exc import BadInputExc, NotFoundExc from swh.web.ui.main import app @app.route('/api/1/stat/counters/') def api_stats(): """Return statistics on SWH storage. Returns: SWH storage's statistics. """ return service.stat_counters() @app.route('/api/1/search/') @app.route('/api/1/search//') def api_search(q='sha1:bd819b5b28fcde3bf114d16a44ac46250da94ee5'): """Search a content per hash. Args: q is of the form algo_hash:hash with algo_hash in (sha1, sha1_git, sha256). Returns: Dictionary with 'found' key and the associated result. Raises: BadInputExc in case of unknown algo_hash or bad hash. Example: GET /api/1/search/sha1:bd819b5b28fcde3bf114d16a44ac46250da94ee5/ """ r = service.lookup_hash(q).get('found') return {'found': True if r else False} def _api_lookup(criteria, lookup_fn, error_msg_if_not_found, enrich_fn=lambda x: x, *args): """Capture a redundant behavior of: - looking up the backend with a criteria (be it an identifier or checksum) passed to the function lookup_fn - if nothing is found, raise an NotFoundExc exception with error message error_msg_if_not_found. - Otherwise if something is returned: - either as list, map or generator, map the enrich_fn function to it and return the resulting data structure as list. - either as dict and pass to enrich_fn and return the dict enriched. Args: - criteria: discriminating criteria to lookup - lookup_fn: function expects one criteria and optional supplementary *args. - error_msg_if_not_found: if nothing matching the criteria is found, raise NotFoundExc with this error message. - enrich_fn: Function to use to enrich the result returned by lookup_fn. Default to the identity function if not provided. - *args: supplementary arguments to pass to lookup_fn. Raises: NotFoundExp or whatever `lookup_fn` raises. """ res = lookup_fn(criteria, *args) if not res: raise NotFoundExc(error_msg_if_not_found) if isinstance(res, (map, list, GeneratorType)): enriched_data = [] for e in res: enriched_data.append(enrich_fn(e)) return enriched_data return enrich_fn(res) @app.route('/api/1/origin/') @app.route('/api/1/origin//') def api_origin(origin_id=1): """Return information about origin with id origin_id. Args: origin_id: the origin's identifier. Returns: Information on the origin if found. Raises: NotFoundExc if the origin is not found. Example: GET /api/1/origin/1/ """ return _api_lookup( origin_id, lookup_fn=service.lookup_origin, error_msg_if_not_found='Origin with id %s not found.' % origin_id) @app.route('/api/1/person/') @app.route('/api/1/person//') def api_person(person_id=1): """Return information about person with identifier person_id. Args: person_id: the person's identifier. Returns: Information on the person if found. Raises: NotFoundExc if the person is not found. Example: GET /api/1/person/1/ """ return _api_lookup( person_id, lookup_fn=service.lookup_person, error_msg_if_not_found='Person with id %s not found.' % person_id) def _enrich_release(release): """Enrich a release with link to the 'target' of 'type' revision. """ if 'target' in release and \ 'target_type' in release and \ release['target_type'] == 'revision': release['target_url'] = url_for('api_revision', sha1_git=release['target']) return release @app.route('/api/1/release/') @app.route('/api/1/release//') def api_release(sha1_git='1e951912027ea6873da6985b91e50c47f645ae1a'): """Return information about release with id sha1_git. Args: sha1_git: the release's hash. Returns: Information on the release if found. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the release is not found. Example: GET /api/1/release/b307094f00c3641b0c9da808d894f3a325371414 """ error_msg = 'Release with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, lookup_fn=service.lookup_release, error_msg_if_not_found=error_msg, enrich_fn=_enrich_release) def _enrich_revision_with_urls(revision, context=None): """Enrich revision with links where it makes sense (directory, parents). """ if not context: context = revision['id'] revision['url'] = url_for('api_revision', sha1_git=revision['id']) revision['history_url'] = url_for('api_revision_log', sha1_git=revision['id']) if 'directory' in revision: revision['directory_url'] = url_for('api_directory', sha1_git=revision['directory']) if 'parents' in revision: parents = [] for parent in revision['parents']: parents.append(url_for('api_revision_history', sha1_git_root=context, sha1_git=parent)) revision['parent_urls'] = parents if 'children' in revision: children = [] for child in revision['children']: children.append(url_for('api_revision_history', sha1_git_root=context, sha1_git=child)) revision['children_urls'] = children return revision @app.route('/api/1/revision' '/origin/' '/directory/') @app.route('/api/1/revision' '/origin/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/directory//') def api_directory_through_revision_with_origin(origin_id=1, branch_name="refs/heads/master", ts=None, path=None): """Display directory or content information through a revision identified by origin/branch/timestamp. Args: origin_id: origin's identifier (default to 1). branch_name: the optional branch for the given origin (default to master). timestamp: optional timestamp (default to the nearest time crawl of timestamp). path: Path to directory or file to display. Returns: Information on the directory or content pointed to by such revision. Raises: NotFoundExc if the revision is not found or the path pointed to is not found. """ if ts: ts = utils.parse_timestamp(ts) revision = service.lookup_revision_by(origin_id, branch_name, ts) if not revision: raise NotFoundExc('Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts)) return _revision_directory(revision['id'], path, request.path) @app.route('/api/1/revision' '/origin/' '/history//') @app.route('/api/1/revision' '/origin/' '/branch/' '/history//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/history//') def api_history_through_revision_with_origin(origin_id=1, branch_name="refs/heads/master", ts=None, sha1_git=None): """ Return information about revision sha1_git, limited to the sub-graph of all transitive parents of the revision root identified by (origin_id, branch_name, ts). Given sha1_git_root such root revision's identifier, in other words, sha1_git is an ancestor of sha1_git_root. Args: origin_id: origin's identifier (default to 1). branch_name: the optional branch for the given origin (default to master). timestamp: optional timestamp (default to the nearest time crawl of timestamp). sha1_git: one of sha1_git_root's ancestors. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of sha1_git_root (even if it is). Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root. """ limit = int(request.args.get('limit', '100')) if ts: ts = utils.parse_timestamp(ts) rev_root, revision = service.lookup_revision_with_context_by( origin_id, branch_name, ts, sha1_git, limit) if not revision: raise NotFoundExc( "Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s' " "sha1_git_root being the revision's identifier pointed to by " "(origin_id: %s, branch_name: %s, ts: %s)." % (sha1_git, rev_root['id'], origin_id, branch_name, ts)) return _enrich_revision_with_urls(revision, context=rev_root['id']) @app.route('/api/1/revision' '/origin/' '/history/' '/directory/') @app.route('/api/1/revision' '/origin/' '/history/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/history/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/history/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/history/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/history/' '/directory//') def api_directory_through_revision_with_origin_history( origin_id=1, branch_name="refs/heads/master", ts=None, sha1_git=None, path=None): """Return information about directory or content pointed to by the revision defined as: revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root (being the identified sha1 by looking up origin_id/branch_name/ts) Args: origin_id: origin's identifier (default to 1). branch_name: the optional branch for the given origin (default to master). timestamp: optional timestamp (default to the nearest time crawl of timestamp). sha1_git: one of sha1_git_root's ancestors. path: optional directory or content pointed to by that revision. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of sha1_git_root (even if it is). Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root or the path referenced does not exist. """ limit = int(request.args.get('limit', '100')) if ts: ts = utils.parse_timestamp(ts) rev_root, revision = service.lookup_revision_with_context_by(origin_id, branch_name, ts, sha1_git, limit) if not revision: raise NotFoundExc( "Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s' " "sha1_git_root being the revision's identifier pointed to by " "(origin_id: %s, branch_name: %s, ts: %s)." % (sha1_git, rev_root['id'], origin_id, branch_name, ts)) return _revision_directory(revision['id'], path, request.path) @app.route('/api/1/revision' '/origin/') @app.route('/api/1/revision' '/origin//') @app.route('/api/1/revision' '/origin/' '/branch//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts//') @app.route('/api/1/revision' '/origin/' '/ts//') def api_revision_with_origin(origin_id=1, branch_name="refs/heads/master", ts=None): """Instead of having to specify a (root) revision by SHA1_GIT, users might want to specify a place and a time. In SWH a "place" is an origin; a "time" is a timestamp at which some place has been observed by SWH crawlers. Args: origin_id: origin's identifier (default to 1). branch_name: the optional branch for the given origin (default to master). timestamp: optional timestamp (default to the nearest time crawl of timestamp). Returns: Information on the revision if found. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the revision is not found. """ if ts: ts = utils.parse_timestamp(ts) return _api_lookup( origin_id, service.lookup_revision_by, 'Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts), _enrich_revision_with_urls, branch_name, ts) @app.route('/api/1/revision/') @app.route('/api/1/revision//') def api_revision(sha1_git='a585d2b738bfa26326b3f1f40f0f1eda0c067ccf'): """Return information about revision with id sha1_git. Args: sha1_git: the revision's hash. Returns: Information on the revision if found. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the revision is not found. Example: GET /api/1/revision/baf18f9fc50a0b6fef50460a76c33b2ddc57486e """ return _api_lookup( sha1_git, lookup_fn=service.lookup_revision, error_msg_if_not_found='Revision with sha1_git %s not' ' found.' % sha1_git, enrich_fn=_enrich_revision_with_urls) def _enrich_directory(directory, context_url=None): """Enrich directory with url to content or directory. """ if 'type' in directory: target_type = directory['type'] target = directory['target'] if target_type == 'file': directory['target_url'] = url_for('api_content_with_details', q='sha1_git:%s' % target) if context_url: directory['file_url'] = context_url + directory['name'] + '/' else: directory['target_url'] = url_for('api_directory', sha1_git=target) if context_url: directory['dir_url'] = context_url + directory['name'] + '/' return directory def _revision_directory(rev_sha1_git, dir_path, request_path): """Compute the revision rev_sha1_git's directory or content data. """ def enrich_directory_local(dir, context_url=request_path): return _enrich_directory(dir, context_url) result = service.lookup_directory_with_revision(rev_sha1_git, dir_path) if not result: raise NotFoundExc('Revision with sha1_git %s not' ' found.' % rev_sha1_git) if result['type'] == 'dir': # dir_entries - return list(map(enrich_directory_local, result['content'])) + return { + 'type': 'dir', + 'revision': rev_sha1_git, + 'content': list(map(enrich_directory_local, result['content'])) + } else: # content - return _enrich_content(result['content']) + return { + 'type': 'file', + 'revision': rev_sha1_git, + 'content': _enrich_content(result['content']) + } @app.route('/api/1/revision//directory/') @app.route('/api/1/revision//directory//') def api_directory_with_revision( sha1_git='a585d2b738bfa26326b3f1f40f0f1eda0c067ccf', dir_path=None): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). Args: sha1_git: revision's hash. dir_path: optional directory pointed to by that revision. Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc either if the revision is not found or the path referenced does not exist Example: GET /api/1/revision/baf18f9fc50a0b6fef50460a76c33b2ddc57486e/directory/ """ return _revision_directory(sha1_git, dir_path, request.path) @app.route('/api/1/revision//history//') def api_revision_history(sha1_git_root, sha1_git): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. In other words, sha1_git is an ancestor of sha1_git_root. Args: sha1_git_root: latest revision of the browsed history. sha1_git: one of sha1_git_root's ancestors. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of sha1_git_root (even if it is). Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root. """ limit = int(request.args.get('limit', '100')) if sha1_git == sha1_git_root: return redirect(url_for('api_revision', sha1_git=sha1_git, limit=limit)) revision = service.lookup_revision_with_context(sha1_git_root, sha1_git, limit) if not revision: raise NotFoundExc( "Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s'" % (sha1_git, sha1_git_root)) return _enrich_revision_with_urls(revision, context=sha1_git_root) @app.route('/api/1/revision/' '/history/' '/directory/') @app.route('/api/1/revision/' '/history/' '/directory//') def api_directory_revision_history(sha1_git_root, sha1_git, dir_path=None): """Return information about directory pointed to by the revision defined as: revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. Args: sha1_git_root: latest revision of the browsed history. sha1_git: one of sha1_git_root's ancestors. dir_path: optional directory pointed to by that revision. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of sha1_git_root (even if it is). Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root or the path referenced does not exist """ limit = int(request.args.get('limit', '100')) if sha1_git == sha1_git_root: return redirect(url_for('api_directory_with_revision', sha1_git=sha1_git, dir_path=dir_path), code=301) revision = service.lookup_revision_with_context(sha1_git_root, sha1_git, limit) if not revision: raise NotFoundExc( "Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s'" % (sha1_git, sha1_git_root)) return _revision_directory(revision['id'], dir_path, request.path) @app.route('/api/1/revision//log/') def api_revision_log(sha1_git): """Show all revisions (~git log) starting from sha1_git. The first element returned is the given sha1_git. Args: sha1_git: the revision's hash. limit: optional query parameter to limit the revisions log (default to 100). Returns: Information on the revision if found. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the revision is not found. """ limit = int(request.args.get('limit', '100')) def lookup_revision_log_with_limit(s, limit=limit): return service.lookup_revision_log(s, limit) error_msg = 'Revision with sha1_git %s not found.' % sha1_git return _api_lookup(sha1_git, lookup_fn=lookup_revision_log_with_limit, error_msg_if_not_found=error_msg, enrich_fn=_enrich_revision_with_urls) @app.route('/api/1/directory/') @app.route('/api/1/directory//') def api_directory(sha1_git='dcf3289b576b1c8697f2a2d46909d36104208ba3'): """Return information about release with id sha1_git. Args: sha1_git: Directory's sha1_git. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the content is not found. Example: GET /api/1/directory/8d7dc91d18546a91564606c3e3695a5ab568d179 """ error_msg = 'Directory with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, lookup_fn=service.lookup_directory, error_msg_if_not_found=error_msg, enrich_fn=_enrich_directory) # @app.route('/api/1/browse/') # @app.route('/api/1/browse//') def api_content_checksum_to_origin(q='sha1_git:26ac0281bc74e9bd8a4a4aab1c7c7a' '0c19d4436c'): """Return content information up to one of its origin if the content is found. Args: q is of the form algo_hash:hash with algo_hash in (sha1, sha1_git, sha256). Returns: Information on one possible origin for such content. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the content is not found. Example: GET /api/1/browse/sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242 """ found = service.lookup_hash(q)['found'] if not found: raise NotFoundExc('Content with %s not found.' % q) return service.lookup_hash_origin(q) @app.route('/api/1/content//raw/') def api_content_raw(q): """Return content's raw data if content is found. Args: q is of the form (algo_hash:)hash with algo_hash in (sha1, sha1_git, sha256). When algo_hash is not provided, 'hash' is considered sha1. Returns: Content's raw data in application/octet-stream. Raises: - BadInputExc in case of unknown algo_hash or bad hash - NotFoundExc if the content is not found. """ def generate(content): yield content['data'] content = service.lookup_content_raw(q) if not content: raise NotFoundExc('Content with %s not found.' % q) return Response(generate(content), mimetype='application/octet-stream') def _enrich_content(content): """Enrich content with 'data', a link to its raw content. """ content['data_url'] = url_for('api_content_raw', q=content['sha1']) return content @app.route('/api/1/content/') @app.route('/api/1/content//') def api_content_with_details(q='sha256:e2c76e40866bb6b28916387bdfc8649beceb' '523015738ec6d4d540c7fe65232b'): """Return content information if content is found. Args: q is of the form (algo_hash:)hash with algo_hash in (sha1, sha1_git, sha256). When algo_hash is not provided, 'hash' is considered sha1. Returns: Content's information. Raises: - BadInputExc in case of unknown algo_hash or bad hash. - NotFoundExc if the content is not found. Example: GET /api/1/content/sha256:e2c76e40866bb6b28916387bdfc8649beceb 523015738ec6d4d540c7fe65232b """ return _api_lookup( q, lookup_fn=service.lookup_content, error_msg_if_not_found='Content with %s not found.' % q, enrich_fn=_enrich_content) def _enrich_entity(entity): """Enrich entity with """ entity['uuid_url'] = url_for('api_entity_by_uuid', uuid=entity['uuid']) if 'parent' in entity and entity['parent']: entity['parent_url'] = url_for('api_entity_by_uuid', uuid=entity['parent']) return entity @app.route('/api/1/entity/') @app.route('/api/1/entity//') def api_entity_by_uuid(uuid='5f4d4c51-498a-4e28-88b3-b3e4e8396cba'): """Return content information if content is found. Args: q is of the form (algo_hash:)hash with algo_hash in (sha1, sha1_git, sha256). When algo_hash is not provided, 'hash' is considered sha1. Returns: Content's information. Raises: - BadInputExc in case of unknown algo_hash or bad hash. - NotFoundExc if the content is not found. Example: - GET /api/1/entity/5f4d4c51-498a-4e28-88b3-b3e4e8396cba/ - GET /api/1/entity/7c33636b-8f11-4bda-89d9-ba8b76a42cec/ """ return _api_lookup( uuid, lookup_fn=service.lookup_entity_by_uuid, error_msg_if_not_found="Entity with uuid '%s' not found." % uuid, enrich_fn=_enrich_entity) @app.route('/api/1/uploadnsearch/', methods=['POST']) def api_uploadnsearch(): """Upload the file's content in the post body request. Compute its hash and determine if it exists in the storage. Args: request.files filled with the filename's data to upload. Returns: Dictionary with 'sha1', 'filename' and 'found' predicate depending on whether we find it or not. Raises: BadInputExc in case of the form submitted is incorrect. """ file = request.files.get('filename') if not file: raise BadInputExc("Bad request, missing 'filename' entry in form.") return service.upload_and_search(file) diff --git a/swh/web/ui/templates/revision-directory.html b/swh/web/ui/templates/revision-directory.html index 774d083f..e5d4f1e9 100644 --- a/swh/web/ui/templates/revision-directory.html +++ b/swh/web/ui/templates/revision-directory.html @@ -1,29 +1,33 @@ {% extends "layout.html" %} {% block title %}Browse revision at path{% endblock %} {% block content %} {% if message is not none %} {{ message }} {% endif %} {% if result is not none %} -

Browse revision '{{ sha1_git }}' with {{ result['type'] }}path '{{ path }}':

+

Browse revision '{{ revision }}' with {{ result['type'] }}path '{{ path }}':

{% if result['type'] == 'dir' %} {% if result['content'] is not none %} {% for e in result['content'] %} - + {% if e.type == 'dir' %} + + {% else %} + + {% endif %} {% endfor %} {% endif %} {% else %} {% if result['content'] is not none %} {% for key in ['sha1', 'sha256', 'sha1_git', 'status', 'length', 'ctime'] %}
{{ key }}
{{ result['content'][key] }}
{% endfor %} {% endif %} {% endif %} {% endif %} {% endblock %} diff --git a/swh/web/ui/tests/test_api.py b/swh/web/ui/tests/test_api.py index 787fa252..4706d38e 100644 --- a/swh/web/ui/tests/test_api.py +++ b/swh/web/ui/tests/test_api.py @@ -1,1976 +1,1990 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json import unittest import yaml from nose.tools import istest from unittest.mock import patch, MagicMock from swh.web.ui.tests import test_app from swh.web.ui import api, exc from swh.web.ui.exc import NotFoundExc, BadInputExc from swh.storage.exc import StorageDBError, StorageAPIError class ApiTestCase(test_app.SWHApiTestCase): @istest def generic_api_lookup_Nothing_is_found(self): # given def test_generic_lookup_fn(sha1, another_unused_arg): assert another_unused_arg == 'unused arg' assert sha1 == 'sha1' return None # when with self.assertRaises(NotFoundExc) as cm: api._api_lookup('sha1', test_generic_lookup_fn, 'This will be raised because None is returned.', lambda x: x, 'unused arg') self.assertIn('This will be raised because None is returned.', cm.exception.args[0]) @istest def generic_api_map_are_enriched_and_transformed_to_list(self): # given def test_generic_lookup_fn_1(criteria0, param0, param1): assert criteria0 == 'something' return map(lambda x: x + 1, [1, 2, 3]) # when actual_result = api._api_lookup( 'something', test_generic_lookup_fn_1, 'This is not the error message you are looking for. Move along.', lambda x: x * 2, 'some param 0', 'some param 1') self.assertEqual(actual_result, [4, 6, 8]) @istest def generic_api_list_are_enriched_too(self): # given def test_generic_lookup_fn_2(crit): assert crit == 'something' return ['a', 'b', 'c'] # when actual_result = api._api_lookup( 'something', test_generic_lookup_fn_2, 'Not the error message you are looking for, it is. ' 'Along, you move!', lambda x: ''. join(['=', x, '='])) self.assertEqual(actual_result, ['=a=', '=b=', '=c=']) @istest def generic_api_generator_are_enriched_and_returned_as_list(self): # given def test_generic_lookup_fn_3(crit): assert crit == 'crit' return (i for i in [4, 5, 6]) # when actual_result = api._api_lookup( 'crit', test_generic_lookup_fn_3, 'Move!', lambda x: x - 1) self.assertEqual(actual_result, [3, 4, 5]) @istest def generic_api_simple_data_are_enriched_and_returned_too(self): # given def test_generic_lookup_fn_4(crit): assert crit == '123' return {'a': 10} def test_enrich_data(x): x['a'] = x['a'] * 10 return x # when actual_result = api._api_lookup( '123', test_generic_lookup_fn_4, 'Nothing to do', test_enrich_data) self.assertEqual(actual_result, {'a': 100}) @patch('swh.web.ui.api.service') # @istest def api_content_checksum_to_origin(self, mock_service): mock_service.lookup_hash.return_value = {'found': True} stub_origin = { "lister": None, "url": "rsync://ftp.gnu.org/old-gnu/webbase", "type": "ftp", "id": 2, "project": None } mock_service.lookup_hash_origin.return_value = stub_origin # when rv = self.app.get( '/api/1/browse/sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_origin) mock_service.lookup_hash.assert_called_once_with( 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03') mock_service.lookup_hash_origin.assert_called_once_with( 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.api.service') # @istest def api_content_checksum_to_origin_sha_not_found(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': False} # when rv = self.app.get( '/api/1/browse/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6' '6c5b00a6d03 not found.' }) mock_service.lookup_hash.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.api.service') @istest def api_content_with_details(self, mock_service): # given mock_service.lookup_content.return_value = { 'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882', 'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560' 'cde9b067a4f', 'length': 17, 'status': 'visible' } # when rv = self.app.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'data_url': '/api/1/content/' '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/', 'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882', 'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560c' 'de9b067a4f', 'length': 17, 'status': 'visible' }) mock_service.lookup_content.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.api.service') @istest def api_content_not_found_as_json(self, mock_service): # given mock_service.lookup_content.return_value = None mock_service.lookup_hash_origin = MagicMock() # when rv = self.app.get( '/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79' '68b3be4735637006560c not found.' }) mock_service.lookup_content.assert_called_once_with( 'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c') mock_service.lookup_hash_origin.called = False @patch('swh.web.ui.api.service') @istest def api_content_not_found_as_yaml(self, mock_service): # given mock_service.lookup_content.return_value = None mock_service.lookup_hash_origin = MagicMock() # when rv = self.app.get( '/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c/', headers={'accept': 'application/yaml'}) self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/yaml') response_data = yaml.load(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79' '68b3be4735637006560c not found.' }) mock_service.lookup_content.assert_called_once_with( 'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c') mock_service.lookup_hash_origin.called = False @patch('swh.web.ui.api.service') @istest def api_content_raw(self, mock_service): # given stub_content = {'data': b'some content data'} mock_service.lookup_content_raw.return_value = stub_content # when rv = self.app.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' '/raw/', headers={'Content-type': 'application/octet-stream', 'Content-disposition': 'attachment'}) self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/octet-stream') self.assertEquals(rv.data, stub_content['data']) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.api.service') @istest def api_content_raw_not_found(self, mock_service): # given mock_service.lookup_content_raw.return_value = None # when rv = self.app.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' '/raw/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6' '6c5b00a6d03 not found.' }) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.api.service') @istest def api_search(self, mock_service): # given mock_service.lookup_hash.return_value = { 'found': { 'sha1': 'or something' } } # when rv = self.app.get('/api/1/search/sha1:blah/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, {'found': True}) mock_service.lookup_hash.assert_called_once_with('sha1:blah') @patch('swh.web.ui.api.service') @istest def api_search_as_yaml(self, mock_service): # given mock_service.lookup_hash.return_value = { 'found': { 'sha1': 'sha1 hash' } } # when rv = self.app.get('/api/1/search/sha1:halb/', headers={'Accept': 'application/yaml'}) self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/yaml') response_data = yaml.load(rv.data.decode('utf-8')) self.assertEquals(response_data, {'found': True}) mock_service.lookup_hash.assert_called_once_with('sha1:halb') @patch('swh.web.ui.api.service') @istest def api_search_not_found(self, mock_service): # given mock_service.lookup_hash.return_value = {} # when rv = self.app.get('/api/1/search/sha1:halb/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, {'found': False}) mock_service.lookup_hash.assert_called_once_with('sha1:halb') @patch('swh.web.ui.api.service') @istest def api_1_stat_counters_raise_error(self, mock_service): # given mock_service.stat_counters.side_effect = ValueError( 'voluntary error to check the bad request middleware.') # when rv = self.app.get('/api/1/stat/counters/') # then self.assertEquals(rv.status_code, 400) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'voluntary error to check the bad request middleware.'}) @patch('swh.web.ui.api.service') @istest def api_1_stat_counters_raise_swh_storage_error_db(self, mock_service): # given mock_service.stat_counters.side_effect = StorageDBError( 'SWH Storage exploded! Will be back online shortly!') # when rv = self.app.get('/api/1/stat/counters/') # then self.assertEquals(rv.status_code, 503) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'An unexpected error occurred in the backend: ' 'SWH Storage exploded! Will be back online shortly!'}) @patch('swh.web.ui.api.service') @istest def api_1_stat_counters_raise_swh_storage_error_api(self, mock_service): # given mock_service.stat_counters.side_effect = StorageAPIError( 'SWH Storage API dropped dead! Will resurrect from its ashes asap!' ) # when rv = self.app.get('/api/1/stat/counters/') # then self.assertEquals(rv.status_code, 503) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'An unexpected error occurred in the api backend: ' 'SWH Storage API dropped dead! Will resurrect from its ashes asap!' }) @patch('swh.web.ui.api.service') @istest def api_1_stat_counters(self, mock_service): # given stub_stats = { "content": 1770830, "directory": 211683, "directory_entry_dir": 209167, "directory_entry_file": 1807094, "directory_entry_rev": 0, "entity": 0, "entity_history": 0, "occurrence": 0, "occurrence_history": 19600, "origin": 1096, "person": 0, "release": 8584, "revision": 7792, "revision_history": 0, "skipped_content": 0 } mock_service.stat_counters.return_value = stub_stats # when rv = self.app.get('/api/1/stat/counters/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_stats) mock_service.stat_counters.assert_called_once_with() @patch('swh.web.ui.api.service') @patch('swh.web.ui.api.request') @istest def api_uploadnsearch_bad_input(self, mock_request, mock_service): # given mock_request.files = {} # when rv = self.app.post('/api/1/uploadnsearch/') self.assertEquals(rv.status_code, 400) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': "Bad request, missing 'filename' entry in form."}) mock_service.upload_and_search.called = False @patch('swh.web.ui.api.service') @patch('swh.web.ui.api.request') @istest def api_uploadnsearch(self, mock_request, mock_service): # given mock_request.files = {'filename': 'simple-filename'} mock_service.upload_and_search.return_value = { 'filename': 'simple-filename', 'sha1': 'some-hex-sha1', 'found': False, } # when rv = self.app.post('/api/1/uploadnsearch/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, {'filename': 'simple-filename', 'sha1': 'some-hex-sha1', 'found': False}) mock_service.upload_and_search.assert_called_once_with( 'simple-filename') @patch('swh.web.ui.api.service') @istest def api_origin(self, mock_service): # given stub_origin = { 'id': 1234, 'lister': 'uuid-lister-0', 'project': 'uuid-project-0', 'url': 'ftp://some/url/to/origin/0', 'type': 'ftp' } mock_service.lookup_origin.return_value = stub_origin # when rv = self.app.get('/api/1/origin/1234/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_origin) mock_service.lookup_origin.assert_called_with(1234) @patch('swh.web.ui.api.service') @istest def api_origin_not_found(self, mock_service): # given mock_service.lookup_origin.return_value = None # when rv = self.app.get('/api/1/origin/4321/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Origin with id 4321 not found.' }) mock_service.lookup_origin.assert_called_with(4321) @patch('swh.web.ui.api.service') @istest def api_release(self, mock_service): # given stub_release = { 'id': 'release-0', 'target_type': 'revision', 'target': 'revision-sha1', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } expected_release = { 'id': 'release-0', 'target_type': 'revision', 'target': 'revision-sha1', 'target_url': '/api/1/revision/revision-sha1/', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } mock_service.lookup_release.return_value = stub_release # when rv = self.app.get('/api/1/release/release-0/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_release) mock_service.lookup_release.assert_called_once_with('release-0') @patch('swh.web.ui.api.service') @istest def api_release_target_type_not_a_revision(self, mock_service): # given stub_release = { 'id': 'release-0', 'target_type': 'other-stuff', 'target': 'other-stuff-checksum', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } expected_release = { 'id': 'release-0', 'target_type': 'other-stuff', 'target': 'other-stuff-checksum', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } mock_service.lookup_release.return_value = stub_release # when rv = self.app.get('/api/1/release/release-0/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_release) mock_service.lookup_release.assert_called_once_with('release-0') @patch('swh.web.ui.api.service') @istest def api_release_not_found(self, mock_service): # given mock_service.lookup_release.return_value = None # when rv = self.app.get('/api/1/release/release-0/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Release with sha1_git release-0 not found.' }) @patch('swh.web.ui.api.service') @istest def api_revision(self, mock_service): # given stub_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['8734ef7e7c357ce2af928115c6c6a42b7e2a44e7'], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, } mock_service.lookup_revision.return_value = stub_revision expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233e' 'ff7371d5/log/', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6' 'a42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [ '8734ef7e7c357ce2af928115c6c6a42b7e2a44e7' ], 'parent_urls': [ '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5' '/history/8734ef7e7c357ce2af928115c6c6a42b7e2a44e7/' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, } # when rv = self.app.get('/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_revision) mock_service.lookup_revision.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.api.service') @istest def api_revision_not_found(self, mock_service): # given mock_service.lookup_revision.return_value = None # when rv = self.app.get('/api/1/revision/revision-0/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Revision with sha1_git revision-0 not found.'}) @patch('swh.web.ui.api.service') @istest def api_revision_with_origin_not_found(self, mock_service): mock_service.lookup_revision_by.return_value = None rv = self.app.get('/api/1/revision/origin/123/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertIn('Revision with (origin_id: 123', response_data['error']) self.assertIn('not found', response_data['error']) mock_service.lookup_revision_by.assert_called_once_with( 123, 'refs/heads/master', None) @patch('swh.web.ui.api.service') @istest def api_revision_with_origin(self, mock_service): mock_revision = { 'id': '32', 'directory': '21', 'message': 'message 1', 'type': 'deb', } expected_revision = { 'id': '32', 'url': '/api/1/revision/32/', 'history_url': '/api/1/revision/32/log/', 'directory': '21', 'directory_url': '/api/1/directory/21/', 'message': 'message 1', 'type': 'deb', } mock_service.lookup_revision_by.return_value = mock_revision rv = self.app.get('/api/1/revision/origin/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( 1, 'refs/heads/master', None) @patch('swh.web.ui.api.service') @istest def api_revision_with_origin_and_branch_name(self, mock_service): mock_revision = { 'id': '12', 'directory': '23', 'message': 'message 2', 'type': 'tar', } mock_service.lookup_revision_by.return_value = mock_revision expected_revision = { 'id': '12', 'url': '/api/1/revision/12/', 'history_url': '/api/1/revision/12/log/', 'directory': '23', 'directory_url': '/api/1/directory/23/', 'message': 'message 2', 'type': 'tar', } rv = self.app.get('/api/1/revision/origin/1/branch/refs/origin/dev/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( 1, 'refs/origin/dev', None) @patch('swh.web.ui.api.service') @patch('swh.web.ui.api.utils') @istest def api_revision_with_origin_and_branch_name_and_timestamp(self, mock_utils, mock_service): mock_revision = { 'id': '123', 'directory': '456', 'message': 'message 3', 'type': 'tar', } mock_service.lookup_revision_by.return_value = mock_revision expected_revision = { 'id': '123', 'url': '/api/1/revision/123/', 'history_url': '/api/1/revision/123/log/', 'directory': '456', 'directory_url': '/api/1/directory/456/', 'message': 'message 3', 'type': 'tar', } mock_utils.parse_timestamp.return_value = 'parsed-date' rv = self.app.get('/api/1/revision' '/origin/1' '/branch/refs/origin/dev' '/ts/1452591542/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( 1, 'refs/origin/dev', 'parsed-date') mock_utils.parse_timestamp.assert_called_once_with('1452591542') @patch('swh.web.ui.api.service') @patch('swh.web.ui.api.utils') @istest def api_revision_with_origin_and_branch_name_and_timestamp_with_escapes( self, mock_utils, mock_service): mock_revisions = [{ 'id': '999', }] mock_service.lookup_revision_by.return_value = mock_revisions expected_revisions = [{ 'id': '999', 'url': '/api/1/revision/999/', 'history_url': '/api/1/revision/999/log/', }] mock_utils.parse_timestamp.return_value = 'parsed-date' rv = self.app.get('/api/1/revision' '/origin/1' '/branch/refs%2Forigin%2Fdev' '/ts/Today%20is%20' 'January%201,%202047%20at%208:21:00AM/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_revisions) mock_service.lookup_revision_by.assert_called_once_with( 1, 'refs/origin/dev', 'parsed-date') mock_utils.parse_timestamp.assert_called_once_with( 'Today is January 1, 2047 at 8:21:00AM') @patch('swh.web.ui.api.service') @istest def api_directory_through_rev_with_origin_history_with_rev_not_found_0( self, mock_service): # test both endpoint (path is useless here, still the endpoint # must be tested) for url in ['/api/1/revision' '/origin/1' '/history/4563' '/directory/', '/api/1/revision' '/origin/1' '/history/4563' '/directory/some-path/']: # given mock_service.lookup_revision_with_context_by.return_value = { 'id': 'root-rev-id'}, None # when rv = self.app.get(url) # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, { 'error': "Possibly sha1_git '%s' is not an ancestor of sha1_git_root " "'%s' sha1_git_root being the revision's identifier pointed to" " by (origin_id: %s, branch_name: %s, ts: %s)." % ( '4563', 'root-rev-id', 1, 'refs/heads/master', None)}) mock_service.lookup_revision_with_context_by.assert_called( 1, 'refs/heads/master', None, '4563', 100) # reset_mock mock_service.reset_mock() @patch('swh.web.ui.api.service') @istest def api_directory_through_rev_with_origin_history_rev_not_found_1( # noqa self, mock_service): # test both endpoint (path is useless here, still the endpoint # must be tested) for url in ['/api/1/revision' '/origin/10' '/branch/origin/dev' '/history/213' '/directory/', '/api/1/revision' '/origin/10' '/branch/origin/dev' '/history/213' '/directory/some/path/']: # given mock_service.lookup_revision_with_context_by.return_value = { 'id': 'root-rev-id'}, None # when rv = self.app.get(url) # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, { 'error': "Possibly sha1_git '%s' is not an ancestor of sha1_git_root " "'%s' sha1_git_root being the revision's identifier pointed to" " by (origin_id: %s, branch_name: %s, ts: %s)." % ( '213', 'root-rev-id', 10, 'origin/dev', None)}) mock_service.lookup_revision_with_context_by.assert_called( 10, 'origin/dev', None, '213', 100) # for next turn mock_service.reset_mock() @patch('swh.web.ui.api.utils') @patch('swh.web.ui.api.service') @istest def api_directory_through_rev_with_origin_history_rev_found_2( self, mock_service, mock_utils): # test both endpoint (path is useless here, still the endpoint # must be tested) for url in ['/api/1/revision' '/origin/100' '/branch/master' '/ts/2012-11-23' '/history/876' '/directory/', '/api/1/revision' '/origin/100' '/branch/master' '/ts/2012-11-23' '/history/876' '/directory/some-path/']: # given mock_service.lookup_revision_with_context_by.return_value = { 'id': 'root-rev-id'}, None mock_utils.parse_timestamp.return_value = '2012-11-23 00:00:00' # when rv = self.app.get(url) # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, { 'error': "Possibly sha1_git '%s' is not an ancestor of sha1_git_root " "'%s' sha1_git_root being the revision's identifier pointed to" " by (origin_id: %s, branch_name: %s, ts: %s)." % ( '876', 'root-rev-id', 100, 'master', '2012-11-23 00:00:00')}) mock_service.lookup_revision_with_context_by.assert_called( 100, 'master', '2012-11-23 00:00:00', '876', 100) mock_utils.parse_timestamp.assert_called_once_with('2012-11-23') # rest mock_utils.reset_mock() mock_service.reset_mock() @patch('swh.web.ui.api.utils') @patch('swh.web.ui.api.service') @istest def api_directory_through_rev_with_origin_history_rev_ok_but_sha1_rev_not( self, mock_service, mock_utils): # test both endpoint (path is useless here, still the endpoint # must be tested) for url in ['/api/1/revision' '/origin/666' '/branch/refs/master' '/ts/2016-11-23' '/history/123-sha1-git' '/directory/?limit=1000', '/api/1/revision' '/origin/666' '/branch/refs/master' '/ts/2016-11-23' '/history/123-sha1-git' '/directory/some/path/?limit=1000']: # given mock_service.lookup_revision_with_context_by.return_value = { 'id': 'root-rev-id'}, None mock_utils.parse_timestamp.return_value = '2016-11-23 00:00:00' # when rv = self.app.get(url) # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, { 'error': "Possibly sha1_git '%s' is not an ancestor of sha1_git_root " "'%s' sha1_git_root being the revision's identifier pointed to" " by (origin_id: %s, branch_name: %s, ts: %s)." % ('123-sha1-git', 'root-rev-id', 666, 'refs/master', '2016-11-23 00:00:00')}) mock_service.lookup_revision_with_context_by.assert_called( 666, 'refs/master', '2016-11-23 00:00:00', '123-sha1-git', 1000) mock_utils.parse_timestamp.assert_called_once_with('2016-11-23') # reset_mock mock_service.reset_mock() mock_utils.reset_mock() @patch('swh.web.ui.api._revision_directory') @patch('swh.web.ui.api.utils') @patch('swh.web.ui.api.service') @istest def api_directory_through_revision_with_origin_history( self, mock_service, mock_utils, mock_revision_directory): # given stub_root_rev = { 'id': '45-sha1-git-root' } stub_revision = { 'id': 'revision-id', } mock_service.lookup_revision_with_context_by.return_value = ( stub_root_rev, stub_revision) stub_dir_content = [ { 'type': 'dir' }, { 'type': 'file' }, ] mock_revision_directory.return_value = stub_dir_content mock_utils.parse_timestamp.return_value = '2016-11-24 00:00:00' # when url = '/api/1/revision' \ '/origin/999' \ '/branch/refs/dev' \ '/ts/2016-11-24' \ '/history/12-sha1-git' \ '/directory/some/content/' rv = self.app.get(url) # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, stub_dir_content) mock_utils.parse_timestamp.assert_called_once_with('2016-11-24') mock_service.lookup_revision_with_context_by(999, 'refs/dev', '2016-11-24 00:00:00' '12-sha1-git', 100) mock_revision_directory.assert_called_once_with('revision-id', 'some/content', url) @patch('swh.web.ui.api.service') @istest def api_history_through_revision_with_origin_rev_not_found_0( self, mock_service): mock_service.lookup_revision_with_context_by.return_value = { 'id': 'root-rev-id'}, None # when rv = self.app.get('/api/1/revision' '/origin/1' '/history/4563/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, { 'error': "Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s'" " sha1_git_root being the revision's identifier pointed to by " "(origin_id: %s, branch_name: %s, ts: %s)." % ('4563', 'root-rev-id', 1, 'refs/heads/master', None)}) mock_service.lookup_revision_with_context_by.assert_called_once_with( 1, 'refs/heads/master', None, '4563', 100) @patch('swh.web.ui.api.service') @istest def api_history_through_revision_with_origin_rev_not_found_1( self, mock_service): # given mock_service.lookup_revision_with_context_by.return_value = { 'id': 'root-rev-id'}, None # when rv = self.app.get('/api/1/revision' '/origin/10' '/branch/origin/dev' '/history/213/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, { 'error': "Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s'" " sha1_git_root being the revision's identifier pointed to by " "(origin_id: %s, branch_name: %s, ts: %s)." % ('213', 'root-rev-id', 10, 'origin/dev', None)}) mock_service.lookup_revision_with_context_by.assert_called_once_with( 10, 'origin/dev', None, '213', 100) @patch('swh.web.ui.api.utils') @patch('swh.web.ui.api.service') @istest def api_history_through_revision_with_origin_rev_not_found_2( self, mock_service, mock_utils): # given mock_service.lookup_revision_with_context_by.return_value = { 'id': 'root-rev-id'}, None mock_utils.parse_timestamp.return_value = '2012-11-23 00:00:00' # when rv = self.app.get('/api/1/revision' '/origin/100' '/branch/master' '/ts/2012-11-23' '/history/876/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, { 'error': "Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s'" " sha1_git_root being the revision's identifier pointed to by " "(origin_id: %s, branch_name: %s, ts: %s)." % ('876', 'root-rev-id', 100, 'master', '2012-11-23 00:00:00')}) mock_service.lookup_revision_with_context_by.assert_called_once_with( 100, 'master', '2012-11-23 00:00:00', '876', 100) mock_utils.parse_timestamp.assert_called_once_with('2012-11-23') @patch('swh.web.ui.api.utils') @patch('swh.web.ui.api.service') @istest def api_history_through_revision_with_origin_rev_not_found_3( self, mock_service, mock_utils): # given mock_service.lookup_revision_with_context_by.return_value = { 'id': 'root-rev-id'}, None mock_service.lookup_revision_with_context.return_value = None mock_utils.parse_timestamp.return_value = '2016-11-23 00:00:00' # when rv = self.app.get('/api/1/revision' '/origin/666' '/branch/refs/master' '/ts/2016-11-23' '/history/123-sha1-git/?limit=1000') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, { 'error': "Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s'" " sha1_git_root being the revision's identifier pointed to by " "(origin_id: %s, branch_name: %s, ts: %s)." % ('123-sha1-git', 'root-rev-id', 666, 'refs/master', '2016-11-23 00:00:00')}) mock_service.lookup_revision_with_context_by.assert_called_once_with( 666, 'refs/master', '2016-11-23 00:00:00', '123-sha1-git', 1000) mock_utils.parse_timestamp.assert_called_once_with('2016-11-23') mock_service.lookup_revision_with_context('456-sha1-git-root', '123-sha1-git', 1000) @patch('swh.web.ui.api._enrich_revision_with_urls') @patch('swh.web.ui.api.utils') @patch('swh.web.ui.api.service') @istest def api_history_through_revision( self, mock_service, mock_utils, mock_enrich): # given stub_root_rev = { 'id': '45-sha1-git-root' } stub_revision = { 'children': [], } mock_service.lookup_revision_with_context_by.return_value = ( stub_root_rev, stub_revision) mock_enrich.return_value = 'some-enriched-result' mock_utils.parse_timestamp.return_value = '2016-11-24 00:00:00' # when rv = self.app.get('/api/1/revision' '/origin/999' '/branch/refs/dev' '/ts/2016-11-24' '/history/12-sha1-git/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, 'some-enriched-result') mock_service.lookup_revision_with_context_by.assert_called_once_with( 999, 'refs/dev', '2016-11-24 00:00:00', '12-sha1-git', 100) mock_utils.parse_timestamp.assert_called_once_with('2016-11-24') mock_enrich.assert_called_once_with(stub_revision, context='45-sha1-git-root') @patch('swh.web.ui.api.utils') @patch('swh.web.ui.api.service') @istest def api_directory_through_revision_with_origin_rev_not_found(self, mock_service, mock_utils): mock_service.lookup_revision_by.return_value = None mock_utils.parse_timestamp.return_value = '2012-10-20 00:00:00' rv = self.app.get('/api/1/revision' '/origin/10' '/branch/refs/remote/origin/dev' '/ts/2012-10-20' '/directory/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, { 'error': 'Revision with (origin_id: 10, branch_name: ' 'refs/remote/origin/dev, ts: 2012-10-20 00:00:00) not found.'}) mock_service.lookup_revision_by.assert_called_once_with( 10, 'refs/remote/origin/dev', '2012-10-20 00:00:00') @patch('swh.web.ui.api.service') @patch('swh.web.ui.api._revision_directory') @istest def api_directory_through_revision_with_origin(self, mock_revision_dir, mock_service): mock_revision = { 'id': '998', } expected_res = [{ 'id': '123' }] mock_revision_dir.return_value = expected_res mock_service.lookup_revision_by.return_value = mock_revision rv = self.app.get('/api/1/revision/origin/3/directory/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_res) mock_service.lookup_revision_by.assert_called_once_with( 3, 'refs/heads/master', None) mock_revision_dir.assert_called_once_with( '998', None, '/api/1/revision/origin/3/directory/') @patch('swh.web.ui.api.service') @istest def api_revision_log(self, mock_service): # given stub_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'], 'type': 'tar', 'synthetic': True, }] mock_service.lookup_revision_log.return_value = stub_revisions expected_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef' 'f7371d5/log/', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a' '42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [ '7834ef7e7c357ce2af928115c6c6a42b7e2a4345' ], 'parent_urls': [ '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5' '/history/7834ef7e7c357ce2af928115c6c6a42b7e2a4345/' ], 'type': 'tar', 'synthetic': True, }] # when rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42' 'b7e2a44e6/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_revisions) mock_service.lookup_revision_log.assert_called_once_with( '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 100) @patch('swh.web.ui.api.service') @istest def api_revision_log_not_found(self, mock_service): # given mock_service.lookup_revision_log.return_value = None # when rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42b7' 'e2a44e6/log/?limit=10') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Revision with sha1_git' ' 8834ef7e7c357ce2af928115c6c6a42b7e2a44e6 not found.'}) mock_service.lookup_revision_log.assert_called_once_with( '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 10) @patch('swh.web.ui.api.service') @istest def api_revision_history_not_found(self, mock_service): # given mock_service.lookup_revision_with_context.return_value = None # then rv = self.app.get('/api/1/revision/999/history/338/?limit=5') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') mock_service.lookup_revision_with_context.assert_called_once_with( '999', '338', 5) @istest def api_revision_history_sha1_same_so_redirect(self): # when rv = self.app.get('/api/1/revision/123/history/123?limit=10') # then self.assertEquals(rv.status_code, 301) # Ideally we'd like to be able to check the resulting url path # but does not work, this returns the current url # also following the redirect would mean to yet mock again the # destination url... So for now cannot test it # self.assertEquals(rv.location, # 'http://localhost/api/1/revision/123?limit=10') @patch('swh.web.ui.api.service') @istest def api_revision_history(self, mock_service): # for readability purposes, we use: # - sha1 as 3 letters (url are way too long otherwise to respect pep8) # - only keys with modification steps (all other keys are kept as is) # given stub_revision = { 'id': '883', 'children': ['777', '999'], 'parents': [], 'directory': '272' } mock_service.lookup_revision_with_context.return_value = stub_revision # then rv = self.app.get('/api/1/revision/666/history/883/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'id': '883', 'url': '/api/1/revision/883/', 'history_url': '/api/1/revision/883/log/', 'children': ['777', '999'], 'children_urls': ['/api/1/revision/666/history/777/', '/api/1/revision/666/history/999/'], 'parents': [], 'parent_urls': [], 'directory': '272', 'directory_url': '/api/1/directory/272/' }) mock_service.lookup_revision_with_context.assert_called_once_with( '666', '883', 100) @patch('swh.web.ui.api.service') @istest def api_directory_with_revision_not_found(self, mock_service): # given mock_service.lookup_directory_with_revision.return_value = None # then rv = self.app.get('/api/1/revision/999/directory/some/path/to/dir/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') mock_service.lookup_directory_with_revision.assert_called_once_with( '999', 'some/path/to/dir') @patch('swh.web.ui.api.service') @istest def api_directory_with_revision_not_found_2(self, mock_service): # given mock_service.lookup_directory_with_revision.return_value = None # then rv = self.app.get('/api/1/revision/123/directory/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') mock_service.lookup_directory_with_revision.assert_called_once_with( '123', None) @patch('swh.web.ui.api.service') @istest def api_directory_with_revision_ok_returns_dir_entries(self, mock_service): stub_dir = { 'type': 'dir', 'content': [ { 'sha1_git': '789', 'type': 'file', 'target': '101', 'name': 'somefile' }, { 'sha1_git': '123', 'type': 'dir', 'target': '456', 'name': 'to-subdir', } ] } - expected_dir = [ - { - 'sha1_git': '789', - 'type': 'file', - 'target': '101', - 'target_url': '/api/1/content/sha1_git:101/', - 'name': 'somefile', - 'file_url': '/api/1/revision/999/directory/some/path/somefile/' - }, - { - 'sha1_git': '123', - 'type': 'dir', - 'target': '456', - 'target_url': '/api/1/directory/456/', - 'name': 'to-subdir', - 'dir_url': '/api/1/revision/999/directory/some/path/' - 'to-subdir/', - }] + expected_dir = { + 'type': 'dir', + 'revision': '999', + 'content': [ + { + 'sha1_git': '789', + 'type': 'file', + 'target': '101', + 'target_url': '/api/1/content/sha1_git:101/', + 'name': 'somefile', + 'file_url': '/api/1/revision/999/directory/some/path/' + 'somefile/' + }, + { + 'sha1_git': '123', + 'type': 'dir', + 'target': '456', + 'target_url': '/api/1/directory/456/', + 'name': 'to-subdir', + 'dir_url': '/api/1/revision/999/directory/some/path/' + 'to-subdir/', + }] + } # given mock_service.lookup_directory_with_revision.return_value = stub_dir # then rv = self.app.get('/api/1/revision/999/directory/some/path/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_dir) mock_service.lookup_directory_with_revision.assert_called_once_with( '999', 'some/path') @patch('swh.web.ui.api.service') @istest def api_directory_with_revision_ok_returns_content(self, mock_service): stub_content = { 'type': 'file', 'content': { 'sha1_git': '789', 'sha1': '101', } } expected_content = { - 'sha1_git': '789', - 'sha1': '101', - 'data_url': '/api/1/content/101/raw/', + 'type': 'file', + 'revision': '999', + 'content': { + 'sha1_git': '789', + 'sha1': '101', + 'data_url': '/api/1/content/101/raw/', + } } # given mock_service.lookup_directory_with_revision.return_value = stub_content # then rv = self.app.get('/api/1/revision/999/directory/some/path/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_content) mock_service.lookup_directory_with_revision.assert_called_once_with( '999', 'some/path') @istest def api_directory_revision_history_sha1_same_so_redirect(self): # when rv = self.app.get( '/api/1/revision/123/history/123/directory/path/to/?limit=1') # then self.assertEquals(rv.status_code, 301) # self.assertEquals(rv.location, # 'http://localhost/api/1/revision/123/directory/path/to/') @patch('swh.web.ui.api.service') @istest def api_directory_revision_history_ko_revision_not_found(self, mock_service): # given mock_service.lookup_revision_with_context.return_value = None # then rv = self.app.get('/api/1/revision/456/history/987/directory/path/to/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': "Possibly sha1_git '987' is not " + "an ancestor of sha1_git_root '456'"}) mock_service.lookup_revision_with_context.assert_called_once_with( '456', '987', 100) @patch('swh.web.ui.api.service') @istest def api_directory_revision_history(self, mock_service): # given mock_service.lookup_revision_with_context.return_value = { 'id': 'rev-id' } stub_dir = { 'type': 'dir', 'content': [ { 'sha1_git': '879', 'type': 'file', 'target': '110', 'name': 'subfile' }, { 'sha1_git': '213', 'type': 'dir', 'target': '546', 'name': 'subdir', } ] } - expected_dir = [ - { - 'sha1_git': '879', - 'type': 'file', - 'target': '110', - 'target_url': '/api/1/content/sha1_git:110/', - 'name': 'subfile', - 'file_url': '/api/1/revision/354/history/867/directory/debian/' - 'subfile/', - }, - { - 'sha1_git': '213', - 'type': 'dir', - 'target': '546', - 'target_url': '/api/1/directory/546/', - 'name': 'subdir', - 'dir_url': - '/api/1/revision/354/history/867/directory/debian/subdir/' - }] + expected_dir = { + 'type': 'dir', + 'revision': 'rev-id', + 'content': [ + { + 'sha1_git': '879', + 'type': 'file', + 'target': '110', + 'target_url': '/api/1/content/sha1_git:110/', + 'name': 'subfile', + 'file_url': '/api/1/revision/354/history/867/directory/' + 'debian/' + 'subfile/', + }, + { + 'sha1_git': '213', + 'type': 'dir', + 'target': '546', + 'target_url': '/api/1/directory/546/', + 'name': 'subdir', + 'dir_url': + '/api/1/revision/354/history/867/directory/debian/subdir/' + }] + } # given mock_service.lookup_directory_with_revision.return_value = stub_dir # then rv = self.app.get('/api/1/revision/354' '/history/867' '/directory/debian/?limit=4') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_dir) mock_service.lookup_revision_with_context.assert_called_once_with( '354', '867', 4) mock_service.lookup_directory_with_revision('rev-id', 'debian') @patch('swh.web.ui.api.service') @istest def api_person(self, mock_service): # given stub_person = { 'id': '198003', 'name': 'Software Heritage', 'email': 'robot@softwareheritage.org', } mock_service.lookup_person.return_value = stub_person # when rv = self.app.get('/api/1/person/198003/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_person) @patch('swh.web.ui.api.service') @istest def api_person_not_found(self, mock_service): # given mock_service.lookup_person.return_value = None # when rv = self.app.get('/api/1/person/666/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Person with id 666 not found.'}) @patch('swh.web.ui.api.service') @istest def api_directory(self, mock_service): # given stub_directories = [ { 'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5', 'type': 'file', 'target': '4568be353ed3480476f032475e7c233eff737123', }, { 'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737', 'type': 'dir', 'target': '8be353ed3480476f032475e7c233eff737123456', }] expected_directories = [ { 'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5', 'type': 'file', 'target': '4568be353ed3480476f032475e7c233eff737123', 'target_url': '/api/1/content/' 'sha1_git:4568be353ed3480476f032475e7c233eff737123/', }, { 'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737', 'type': 'dir', 'target': '8be353ed3480476f032475e7c233eff737123456', 'target_url': '/api/1/directory/8be353ed3480476f032475e7c233eff737123456/', }] mock_service.lookup_directory.return_value = stub_directories # when rv = self.app.get('/api/1/directory/' '18d8be353ed3480476f032475e7c233eff7371d5/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_directories) mock_service.lookup_directory.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.api.service') @istest def api_directory_not_found(self, mock_service): # given mock_service.lookup_directory.return_value = [] # when rv = self.app.get('/api/1/directory/' '66618d8be353ed3480476f032475e7c233eff737/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Directory with sha1_git ' '66618d8be353ed3480476f032475e7c233eff737 not found.'}) @patch('swh.web.ui.api.service') @istest def api_lookup_entity_by_uuid_not_found(self, mock_service): # when mock_service.lookup_entity_by_uuid.return_value = [] # when rv = self.app.get('/api/1/entity/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': "Entity with uuid '5f4d4c51-498a-4e28-88b3-b3e4e8396cba' not " + "found."}) mock_service.lookup_entity_by_uuid.assert_called_once_with( '5f4d4c51-498a-4e28-88b3-b3e4e8396cba') @patch('swh.web.ui.api.service') @istest def api_lookup_entity_by_uuid_bad_request(self, mock_service): # when mock_service.lookup_entity_by_uuid.side_effect = BadInputExc( 'bad input: uuid malformed!') # when rv = self.app.get('/api/1/entity/uuid malformed/') self.assertEquals(rv.status_code, 400) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'bad input: uuid malformed!'}) mock_service.lookup_entity_by_uuid.assert_called_once_with( 'uuid malformed') @patch('swh.web.ui.api.service') @istest def api_lookup_entity_by_uuid(self, mock_service): # when stub_entities = [ { 'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4', 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2' }, { 'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2' } ] mock_service.lookup_entity_by_uuid.return_value = stub_entities expected_entities = [ { 'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4', 'uuid_url': '/api/1/entity/34bd6b1b-463f-43e5-a697-' '785107f598e4/', 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2', 'parent_url': '/api/1/entity/aee991a0-f8d7-4295-a201-' 'd1ce2efc9fb2/' }, { 'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2', 'uuid_url': '/api/1/entity/aee991a0-f8d7-4295-a201-' 'd1ce2efc9fb2/' } ] # when rv = self.app.get('/api/1/entity' '/34bd6b1b-463f-43e5-a697-785107f598e4/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_entities) mock_service.lookup_entity_by_uuid.assert_called_once_with( '34bd6b1b-463f-43e5-a697-785107f598e4') class ApiUtils(unittest.TestCase): @istest def api_lookup_not_found(self): # when with self.assertRaises(exc.NotFoundExc) as e: api._api_lookup('something', lambda x: None, 'this is the error message raised as it is None') self.assertEqual(e.exception.args[0], 'this is the error message raised as it is None') @istest def api_lookup_with_result(self): # when actual_result = api._api_lookup('something', lambda x: x + '!', 'this is the error which won\'t be ' 'used here') self.assertEqual(actual_result, 'something!') @istest def api_lookup_with_result_as_map(self): # when actual_result = api._api_lookup([1, 2, 3], lambda x: map(lambda y: y+1, x), 'this is the error which won\'t be ' 'used here') self.assertEqual(actual_result, [2, 3, 4]) diff --git a/swh/web/ui/tests/test_utils.py b/swh/web/ui/tests/test_utils.py index 8cd5032f..04af353e 100644 --- a/swh/web/ui/tests/test_utils.py +++ b/swh/web/ui/tests/test_utils.py @@ -1,383 +1,416 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import dateutil import unittest from unittest.mock import patch, call from nose.tools import istest from swh.web.ui import utils class UtilsTestCase(unittest.TestCase): def setUp(self): self.url_map = [dict(rule='/other/', methods=set(['GET', 'POST', 'HEAD']), endpoint='foo'), dict(rule='/some/old/url/', methods=set(['GET', 'POST']), endpoint='blablafn'), dict(rule='/other/old/url/', methods=set(['GET', 'HEAD']), endpoint='bar'), dict(rule='/other', methods=set([]), endpoint=None), dict(rule='/other2', methods=set([]), endpoint=None)] @istest def filter_endpoints_1(self): # when actual_data = utils.filter_endpoints(self.url_map, '/some') # then self.assertEquals(actual_data, { '/some/old/url/': { 'methods': ['GET', 'POST'], 'endpoint': 'blablafn' } }) @istest def filter_endpoints_2(self): # when actual_data = utils.filter_endpoints(self.url_map, '/other', blacklist=['/other2']) # then # rules /other is skipped because its' exactly the prefix url # rules /other2 is skipped because it's blacklisted self.assertEquals(actual_data, { '/other/': { 'methods': ['GET', 'HEAD', 'POST'], 'endpoint': 'foo' }, '/other/old/url/': { 'methods': ['GET', 'HEAD'], 'endpoint': 'bar' } }) @patch('swh.web.ui.utils.flask') @istest def prepare_directory_listing(self, mock_flask): # given def mock_url_for(url_key, **kwds): if url_key == 'browse_directory': sha1_git = kwds['sha1_git'] return '/path/to/url/dir' + '/' + sha1_git else: sha1_git = kwds['q'] return '/path/to/url/file' + '/' + sha1_git mock_flask.url_for.side_effect = mock_url_for inputs = [{'type': 'dir', 'target': '123', 'name': 'some-dir-name'}, {'type': 'file', 'sha1': '654', 'name': 'some-filename'}, {'type': 'dir', 'target': '987', 'name': 'some-other-dirname'}] expected_output = [{'link': '/path/to/url/dir/123', 'name': 'some-dir-name', 'type': 'dir'}, {'link': '/path/to/url/file/654', 'name': 'some-filename', 'type': 'file'}, {'link': '/path/to/url/dir/987', 'name': 'some-other-dirname', 'type': 'dir'}] # when actual_outputs = utils.prepare_directory_listing(inputs) # then self.assertEquals(actual_outputs, expected_output) mock_flask.url_for.assert_has_calls([ call('browse_directory', sha1_git='123'), call('browse_content_data', q='654'), call('browse_directory', sha1_git='987'), ]) @patch('swh.web.ui.utils.flask') @istest def prepare_directory_listing_with_revision(self, mock_flask): # given def mock_url_for(url_key, **kwds): if url_key == 'browse_revision_directory': sha1_git = kwds['sha1_git'] path = kwds['path'] return '/browse/revision/' + sha1_git + '/directory/' + path mock_flask.url_for.side_effect = mock_url_for inputs = [{'type': 'dir', 'target': '123', 'name': 'some-dir-name'}, {'type': 'file', 'sha1': '654', 'name': 'some-filename'}, {'type': 'dir', 'target': '987', 'name': 'some-other-dirname'}] expected_output = [{'link': '/browse/revision/revsha1/directory/' 'some/path/some-dir-name', 'name': 'some-dir-name', 'type': 'dir'}, {'link': '/browse/revision/revsha1/directory/' 'some/path/some-filename', 'name': 'some-filename', 'type': 'file'}, {'link': '/browse/revision/revsha1/directory/' 'some/path/some-other-dirname', 'name': 'some-other-dirname', 'type': 'dir'}] # when actual_outputs = utils.prepare_directory_listing_with_revision( 'revsha1', 'some/path', inputs) # then self.assertEquals(actual_outputs, expected_output) mock_flask.url_for.assert_has_calls([ call('browse_revision_directory', sha1_git='revsha1', path='some/path/some-dir-name'), call('browse_revision_directory', sha1_git='revsha1', path='some/path/some-filename'), call('browse_revision_directory', sha1_git='revsha1', path='some/path/some-other-dirname'), ]) @patch('swh.web.ui.utils.flask') @istest def prepare_revision_view(self, mock_flask): # given def mock_url_for(url_key, **kwds): if url_key == 'browse_directory': return '/browse/directory/' + kwds['sha1_git'] elif url_key == 'browse_revision': return '/browse/revision/' + kwds['sha1_git'] mock_flask.url_for.side_effect = mock_url_for rev_input = { 'id': 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754', 'date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'committer': { 'email': 'torvalds@linux-foundation.org', 'name': 'Linus Torvalds' }, 'type': 'git', 'author': { 'email': 'torvalds@linux-foundation.org', 'name': 'Linus Torvalds' }, 'message': 'Linux 4.2-rc1\n', 'synthetic': False, 'directory': '2a1dbabeed4dcf1f4a4c441993b2ffc9d972780b', 'parents': [ 'a585d2b738bfa26326b3f1f40f0f1eda0c067ccf' ], 'children': [ 'b696e2b738bfa26326b3f1f40f0f1eda0c067ccf' ], } expected_rev = { 'id': 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754', 'date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'committer': 'Linus Torvalds ', 'type': 'git', 'author': 'Linus Torvalds ', 'message': 'Linux 4.2-rc1\n', 'synthetic': False, 'directory': '/browse/directory/' '2a1dbabeed4dcf1f4a4c441993b2ffc9d972780b', 'parents': [ '/browse/revision/a585d2b738bfa26326b3f1f40f0f1eda0c067ccf' ], 'children': [ '/browse/revision/b696e2b738bfa26326b3f1f40f0f1eda0c067ccf' ], } # when actual_rev = utils.prepare_revision_view(rev_input) # then self.assertEquals(actual_rev, expected_rev) mock_flask.url_for.assert_has_calls([ call('browse_revision', sha1_git='a585d2b738bfa26326b3f1f40f0f1eda0c067ccf'), call('browse_revision', sha1_git='b696e2b738bfa26326b3f1f40f0f1eda0c067ccf'), call('browse_directory', sha1_git='2a1dbabeed4dcf1f4a4c441993b2ffc9d972780b'), ]) @patch('swh.web.ui.utils.flask') @istest def prepare_directory_listing_with_revision_history(self, mock_flask): # given def mock_url_for(url_key, **kwds): if url_key == 'browse_revision_history_directory': sha1_git_root = kwds['sha1_git_root'] sha1_git = kwds['sha1_git'] path = kwds['path'] return '/browse/revision/' + sha1_git_root + \ '/history/' + sha1_git + '/directory/' + path mock_flask.url_for.side_effect = mock_url_for inputs = [{'type': 'dir', 'target': '123', 'name': 'some-dir-name'}, {'type': 'file', 'sha1': '654', 'name': 'some-filename'}, {'type': 'dir', 'target': '987', 'name': 'some-other-dirname'}] expected_output = [{'link': '/browse/revision/revsha1root/history/' 'revsha1/directory/' 'some/path/some-dir-name', 'name': 'some-dir-name', 'type': 'dir'}, {'link': '/browse/revision/revsha1root/history/' 'revsha1/directory/' 'some/path/some-filename', 'name': 'some-filename', 'type': 'file'}, {'link': '/browse/revision/revsha1root/history/' 'revsha1/directory/' 'some/path/some-other-dirname', 'name': 'some-other-dirname', 'type': 'dir'}] # when actual_outputs = utils.prepare_directory_listing_with_revision_history( 'revsha1root', 'revsha1', 'some/path', inputs) # then self.assertEquals(actual_outputs, expected_output) mock_flask.url_for.assert_has_calls([ call('browse_revision_history_directory', sha1_git_root='revsha1root', sha1_git='revsha1', path='some/path/some-dir-name'), call('browse_revision_history_directory', sha1_git_root='revsha1root', sha1_git='revsha1', path='some/path/some-filename'), call('browse_revision_history_directory', sha1_git_root='revsha1root', sha1_git='revsha1', path='some/path/some-other-dirname'), ]) + @istest + def prepare_data_for_view(self): + # given + inputs = [ + { + 'data': 1, + 'data_url': '/api/1/some/api/call', + }, + { + 'blah': 'foobar', + 'blah_url': '/some/non/changed/api/call' + }] + + # when + actual_result = utils.prepare_data_for_view(inputs) + + # then + self.assertEquals(actual_result, [ + { + 'data': 1, + 'data_url': '/browse/some/api/call', + }, + { + 'blah': 'foobar', + 'blah_url': '/some/non/changed/api/call' + } + ]) + @istest def filter_field_keys_dict_unknown_keys(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory1', 'file2'}) # then self.assertEqual(actual_res, {}) @istest def filter_field_keys_dict(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory', 'link'}) # then self.assertEqual(actual_res, {'directory': 1, 'link': 3}) @istest def filter_field_keys_list_unknown_keys(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'1': 1, '2': 2, 'link': 3}], {'d'}) # then self.assertEqual(actual_res, [{}, {}]) @istest def filter_field_keys_list(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'dir': 1, 'fil': 2, 'lin': 3}], {'directory', 'dir'}) # then self.assertEqual(actual_res, [{'directory': 1}, {'dir': 1}]) @istest def filter_field_keys_other(self): # given input_set = {1, 2} # when actual_res = utils.filter_field_keys(input_set, {'a', '1'}) # then self.assertEqual(actual_res, input_set) @istest def fmap(self): self.assertEquals([2, 3, 4], utils.fmap(lambda x: x+1, [1, 2, 3])) + self.assertEquals([11, 12, 13], + list(utils.fmap(lambda x: x+10, + map(lambda x: x, [1, 2, 3])))) self.assertEquals({'a': 2, 'b': 4}, utils.fmap(lambda x: x*2, {'a': 1, 'b': 2})) self.assertEquals(100, utils.fmap(lambda x: x*10, 10)) + self.assertEquals({'a': [2, 6], 'b': 4}, + utils.fmap(lambda x: x*2, {'a': [1, 3], 'b': 2})) @istest def person_to_string(self): self.assertEqual(utils.person_to_string(dict(name='raboof', email='foo@bar')), 'raboof ') @istest def parse_timestamp(self): input_timestamps = [ '2016-01-12', '2016-01-12T09:19:12+0100', 'Today is January 1, 2047 at 8:21:00AM', '1452591542', ] output_dates = [ datetime.datetime(2016, 1, 12, 0, 0), datetime.datetime(2016, 1, 12, 9, 19, 12, tzinfo=dateutil.tz.tzoffset(None, 3600)), datetime.datetime(2047, 1, 1, 8, 21), datetime.datetime(2016, 1, 12, 10, 39, 2), ] for ts, exp_date in zip(input_timestamps, output_dates): self.assertEquals(utils.parse_timestamp(ts), exp_date) diff --git a/swh/web/ui/tests/test_views.py b/swh/web/ui/tests/test_views.py index 0e04a988..bbe74762 100644 --- a/swh/web/ui/tests/test_views.py +++ b/swh/web/ui/tests/test_views.py @@ -1,1330 +1,1405 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from nose.tools import istest from swh.web.ui.tests import test_app from unittest.mock import patch from swh.web.ui.exc import BadInputExc, NotFoundExc class FileMock(): def __init__(self, filename): self.filename = filename class ViewTestCase(test_app.SWHViewTestCase): render_template = False @patch('swh.web.ui.views.flask') @istest def homepage(self, mock_flask): # given mock_flask.flash.return_value = 'something' # when rv = self.client.get('/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('home.html') mock_flask.flash.assert_called_once_with( 'This Web app is still work in progress, use at your own risk', 'warning') @istest def info(self): # when rv = self.client.get('/about/') self.assertEquals(rv.status_code, 200) self.assert_template_used('about.html') self.assertIn(b'About', rv.data) @istest def search_default(self): # when rv = self.client.get('/search/') self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), '') self.assertEqual(self.get_context_variable('messages'), []) self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('file'), None) self.assert_template_used('upload_and_search.html') @patch('swh.web.ui.views.service') @istest def search_get_query_hash_not_found(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': None} # when rv = self.client.get('/search/?q=sha1:456') self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), 'sha1:456') self.assertEqual(self.get_context_variable('messages'), ['Content with hash sha1:456 not found!']) self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('file'), None) self.assert_template_used('upload_and_search.html') mock_service.lookup_hash.assert_called_once_with('sha1:456') @patch('swh.web.ui.views.service') @istest def search_get_query_hash_bad_input(self, mock_service): # given mock_service.lookup_hash.side_effect = BadInputExc('error msg') # when rv = self.client.get('/search/?q=sha1_git:789') self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), 'sha1_git:789') self.assertEqual(self.get_context_variable('messages'), ['error msg']) self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('file'), None) self.assert_template_used('upload_and_search.html') mock_service.lookup_hash.assert_called_once_with('sha1_git:789') @patch('swh.web.ui.views.service') @istest def search_get_query_hash_found(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': True} # when rv = self.client.get('/search/?q=sha1:123') self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), 'sha1:123') self.assertEqual(self.get_context_variable('messages'), ['Content with hash sha1:123 found!']) self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('file'), None) self.assert_template_used('upload_and_search.html') mock_service.lookup_hash.assert_called_once_with('sha1:123') @patch('swh.web.ui.views.service') @istest def search_post_query_hash_not_found(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': None} # when rv = self.client.get('/search/?q=sha1:456') self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), 'sha1:456') self.assertEqual(self.get_context_variable('messages'), ['Content with hash sha1:456 not found!']) self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('file'), None) self.assert_template_used('upload_and_search.html') mock_service.lookup_hash.assert_called_once_with('sha1:456') @patch('swh.web.ui.views.service') @istest def search_post_query_hash_bad_input(self, mock_service): # given mock_service.lookup_hash.side_effect = BadInputExc('error msg!') # when rv = self.client.post('/search/', data=dict(q='sha1_git:987')) self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), 'sha1_git:987') self.assertEqual(self.get_context_variable('messages'), ['error msg!']) self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('file'), None) self.assert_template_used('upload_and_search.html') mock_service.lookup_hash.assert_called_once_with('sha1_git:987') @patch('swh.web.ui.views.service') @istest def search_post_query_hash_found(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': True} # when rv = self.client.post('/search/', data=dict(q='sha1:321')) self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), 'sha1:321') self.assertEqual(self.get_context_variable('messages'), ['Content with hash sha1:321 found!']) self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('file'), None) self.assert_template_used('upload_and_search.html') mock_service.lookup_hash.assert_called_once_with('sha1:321') @patch('swh.web.ui.views.service') @patch('swh.web.ui.views.request') @istest def search_post_upload_and_hash_bad_input(self, mock_request, mock_service): # given mock_request.data = {} mock_request.method = 'POST' mock_request.files = dict(filename=FileMock('foobar')) mock_service.upload_and_search.side_effect = BadInputExc( 'error bad input') # when (mock_request completes the post request) rv = self.client.post('/search/') # then self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('messages'), ['error bad input']) self.assert_template_used('upload_and_search.html') mock_service.upload_and_search.called = True @patch('swh.web.ui.views.service') @patch('swh.web.ui.views.request') @istest def search_post_upload_and_hash_not_found(self, mock_request, mock_service): # given mock_request.data = {} mock_request.method = 'POST' mock_request.files = dict(filename=FileMock('foobar')) mock_service.upload_and_search.return_value = {'filename': 'foobar', 'sha1': 'blahhash', 'found': False} # when (mock_request completes the post request) rv = self.client.post('/search/') # then self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('messages'), ["File foobar with hash blahhash not found!"]) self.assertEqual(self.get_context_variable('filename'), 'foobar') self.assertEqual(self.get_context_variable('sha1'), 'blahhash') self.assert_template_used('upload_and_search.html') mock_service.upload_and_search.called = True @patch('swh.web.ui.views.service') @patch('swh.web.ui.views.request') @istest def search_post_upload_and_hash_found(self, mock_request, mock_service): # given mock_request.data = {} mock_request.method = 'POST' mock_request.files = dict(filename=FileMock('foobar')) mock_service.upload_and_search.return_value = {'filename': 'foobar', 'sha1': '123456789', 'found': True} # when (mock_request completes the post request) rv = self.client.post('/search/') # then self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('messages'), ["File foobar with hash 123456789 found!"]) self.assertEqual(self.get_context_variable('filename'), 'foobar') self.assertEqual(self.get_context_variable('sha1'), '123456789') self.assert_template_used('upload_and_search.html') mock_service.upload_and_search.called = True @patch('swh.web.ui.views.service') @istest def browse_content_detail_not_found(self, mock_service): # given mock_service.lookup_content.return_value = None # when rv = self.client.get('/browse/content/sha1:sha1-hash/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('content.html') self.assertEqual(self.get_context_variable('message'), 'Content with sha1:sha1-hash not found.') self.assertEqual(self.get_context_variable('content'), None) mock_service.lookup_content.assert_called_once_with( 'sha1:sha1-hash') @patch('swh.web.ui.views.service') @istest def browse_content_detail_bad_input(self, mock_service): # given mock_service.lookup_content.side_effect = BadInputExc('Bad input!') # when rv = self.client.get('/browse/content/sha1:sha1-hash/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('content.html') self.assertEqual(self.get_context_variable('message'), 'Bad input!') self.assertIsNone(self.get_context_variable('content')) mock_service.lookup_content.assert_called_once_with( 'sha1:sha1-hash') @patch('swh.web.ui.views.service') @istest def browse_content_detail(self, mock_service): # given stub_content = {'sha1': 'sha1_hash'} mock_service.lookup_content.return_value = stub_content # when rv = self.client.get('/browse/content/sha1:sha1-hash/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('content.html') self.assertIsNone(self.get_context_variable('message')) self.assertEqual(self.get_context_variable('content'), {'sha1': 'sha1_hash'}) mock_service.lookup_content.assert_called_once_with( 'sha1:sha1-hash') @patch('swh.web.ui.views.service') @istest def browse_content_data(self, mock_service): # given stub_content_raw = { 'sha1': 'sha1-hash', 'data': b'some-data' } mock_service.lookup_content_raw.return_value = stub_content_raw # when rv = self.client.get('/browse/content/sha1:sha1-hash/raw/') self.assertEquals(rv.status_code, 200) self.assert_template_used('content-data.html') self.assertEqual(self.get_context_variable('message'), 'Content sha1-hash') self.assertEqual(self.get_context_variable('content'), stub_content_raw) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:sha1-hash') @patch('swh.web.ui.views.service') @istest def browse_content_data_not_found(self, mock_service): # given mock_service.lookup_content_raw.return_value = None # when rv = self.client.get('/browse/content/sha1:sha1-unknown/raw/') self.assertEquals(rv.status_code, 200) self.assert_template_used('content-data.html') self.assertEqual(self.get_context_variable('message'), 'Content with sha1:sha1-unknown not found.') self.assertEqual(self.get_context_variable('content'), None) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:sha1-unknown') @patch('swh.web.ui.views.service') @istest def browse_content_data_invalid_hash(self, mock_service): # given mock_service.lookup_content_raw.side_effect = BadInputExc( 'Invalid hash') # when rv = self.client.get('/browse/content/sha2:sha1-invalid/raw/') self.assertEquals(rv.status_code, 200) self.assert_template_used('content-data.html') self.assertEqual(self.get_context_variable('message'), 'Invalid hash') self.assertEqual(self.get_context_variable('content'), None) mock_service.lookup_content_raw.assert_called_once_with( 'sha2:sha1-invalid') @patch('swh.web.ui.views.service') @patch('swh.web.ui.utils') @istest def browse_directory_bad_input(self, mock_utils, mock_service): # given mock_service.lookup_directory.side_effect = BadInputExc('Invalid hash') # when rv = self.client.get('/browse/directory/sha2-invalid/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('directory.html') self.assertEqual(self.get_context_variable('message'), 'Invalid hash') self.assertEqual(self.get_context_variable('files'), []) mock_service.lookup_directory.assert_called_once_with( 'sha2-invalid') @patch('swh.web.ui.views.service') @patch('swh.web.ui.utils') @istest def browse_directory_empty_result(self, mock_utils, mock_service): # given mock_service.lookup_directory.return_value = None # when rv = self.client.get('/browse/directory/some-sha1/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('directory.html') self.assertEqual(self.get_context_variable('message'), 'Directory some-sha1 not found.') self.assertEqual(self.get_context_variable('files'), []) mock_service.lookup_directory.assert_called_once_with( 'some-sha1') @patch('swh.web.ui.views.service') @patch('swh.web.ui.views.utils') @istest def browse_directory(self, mock_utils, mock_service): # given stub_directory_ls = [ {'type': 'dir', 'target': '123', 'name': 'some-dir-name'}, {'type': 'file', 'sha1': '654', 'name': 'some-filename'}, {'type': 'dir', 'target': '987', 'name': 'some-other-dirname'} ] mock_service.lookup_directory.return_value = stub_directory_ls stub_directory_map = [ {'link': '/path/to/url/dir/123', 'name': 'some-dir-name'}, {'link': '/path/to/url/file/654', 'name': 'some-filename'}, {'link': '/path/to/url/dir/987', 'name': 'some-other-dirname'} ] mock_utils.prepare_directory_listing.return_value = stub_directory_map # when rv = self.client.get('/browse/directory/some-sha1/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('directory.html') self.assertEqual(self.get_context_variable('message'), 'Listing for directory some-sha1:') self.assertEqual(self.get_context_variable('files'), stub_directory_map) mock_service.lookup_directory.assert_called_once_with( 'some-sha1') mock_utils.prepare_directory_listing.assert_called_once_with( stub_directory_ls) @patch('swh.web.ui.views.service') # @istest def browse_content_with_origin_content_not_found(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': False} # when rv = self.client.get('/browse/content/sha256:some-sha256/origin/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('content-with-origin.html') self.assertEqual(self.get_context_variable('message'), 'Hash sha256:some-sha256 was not found.') mock_service.lookup_hash.assert_called_once_with( 'sha256:some-sha256') mock_service.lookup_hash_origin.called = False @patch('swh.web.ui.views.service') # @istest def browse_content_with_origin_bad_input(self, mock_service): # given mock_service.lookup_hash.side_effect = BadInputExc('Invalid hash') # when rv = self.client.get('/browse/content/sha256:some-sha256/origin/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('content-with-origin.html') self.assertEqual( self.get_context_variable('message'), 'Invalid hash') mock_service.lookup_hash.assert_called_once_with( 'sha256:some-sha256') mock_service.lookup_hash_origin.called = False @patch('swh.web.ui.views.service') # @istest def browse_content_with_origin(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': True} mock_service.lookup_hash_origin.return_value = { 'origin_type': 'ftp', 'origin_url': '/some/url', 'revision': 'revision-hash', 'branch': 'master', 'path': '/path/to', } # when rv = self.client.get('/browse/content/sha256:some-sha256/origin/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('content-with-origin.html') self.assertEqual( self.get_context_variable('message'), "The content with hash sha256:some-sha256 has been seen on " + "origin with type 'ftp'\n" + "at url '/some/url'. The revision was identified at " + "'revision-hash' on branch 'master'.\n" + "The file's path referenced was '/path/to'.") mock_service.lookup_hash.assert_called_once_with( 'sha256:some-sha256') mock_service.lookup_hash_origin.assert_called_once_with( 'sha256:some-sha256') @patch('swh.web.ui.views.service') @istest def browse_origin_not_found(self, mock_service): # given mock_service.lookup_origin.return_value = None # when rv = self.client.get('/browse/origin/1/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('origin.html') self.assertEqual(self.get_context_variable('origin_id'), 1) self.assertEqual( self.get_context_variable('message'), 'Origin 1 not found!') mock_service.lookup_origin.assert_called_once_with(1) @patch('swh.web.ui.views.service') @istest def browse_origin_found(self, mock_service): # given mock_origin = {'type': 'git', 'lister': None, 'project': None, 'url': 'rsync://some/url', 'id': 426} mock_service.lookup_origin.return_value = mock_origin # when rv = self.client.get('/browse/origin/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('origin.html') self.assertEqual(self.get_context_variable('origin_id'), 426) self.assertEqual(self.get_context_variable('origin'), mock_origin) mock_service.lookup_origin.assert_called_once_with(426) @patch('swh.web.ui.views.service') @istest def browse_origin_bad_input(self, mock_service): # given mock_service.lookup_origin.side_effect = BadInputExc('wrong input') # when rv = self.client.get('/browse/origin/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('origin.html') self.assertEqual(self.get_context_variable('origin_id'), 426) mock_service.lookup_origin.assert_called_once_with(426) @patch('swh.web.ui.views.service') @istest def browse_person_not_found(self, mock_service): # given mock_service.lookup_person.return_value = None # when rv = self.client.get('/browse/person/1/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('person.html') self.assertEqual(self.get_context_variable('person_id'), 1) self.assertEqual( self.get_context_variable('message'), 'Person 1 not found!') mock_service.lookup_person.assert_called_once_with(1) @patch('swh.web.ui.views.service') @istest def browse_person_found(self, mock_service): # given mock_person = {'type': 'git', 'lister': None, 'project': None, 'url': 'rsync://some/url', 'id': 426} mock_service.lookup_person.return_value = mock_person # when rv = self.client.get('/browse/person/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('person.html') self.assertEqual(self.get_context_variable('person_id'), 426) self.assertEqual(self.get_context_variable('person'), mock_person) mock_service.lookup_person.assert_called_once_with(426) @patch('swh.web.ui.views.service') @istest def browse_person_bad_input(self, mock_service): # given mock_service.lookup_person.side_effect = BadInputExc('wrong input') # when rv = self.client.get('/browse/person/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('person.html') self.assertEqual(self.get_context_variable('person_id'), 426) mock_service.lookup_person.assert_called_once_with(426) @patch('swh.web.ui.views.service') @istest def browse_release_not_found(self, mock_service): # given mock_service.lookup_release.return_value = None # when rv = self.client.get('/browse/release/1/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('release.html') self.assertEqual(self.get_context_variable('sha1_git'), '1') self.assertEqual( self.get_context_variable('message'), 'Release 1 not found!') mock_service.lookup_release.assert_called_once_with('1') @patch('swh.web.ui.views.service') @istest def browse_release_bad_input(self, mock_service): # given mock_service.lookup_release.side_effect = BadInputExc('wrong input') # when rv = self.client.get('/browse/release/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('release.html') self.assertEqual(self.get_context_variable('sha1_git'), '426') mock_service.lookup_release.assert_called_once_with('426') @patch('swh.web.ui.views.service') @istest def browse_release(self, mock_service): # given mock_release = { "date": "Sun, 05 Jul 2015 18:02:06 GMT", "id": "1e951912027ea6873da6985b91e50c47f645ae1a", "target": "d770e558e21961ad6cfdf0ff7df0eb5d7d4f0754", "synthetic": False, "target_type": "revision", "author": { "email": "torvalds@linux-foundation.org", "name": "Linus Torvalds" }, "message": "Linux 4.2-rc1\n", "name": "v4.2-rc1" } mock_service.lookup_release.return_value = mock_release expected_release = { "date": "Sun, 05 Jul 2015 18:02:06 GMT", "id": "1e951912027ea6873da6985b91e50c47f645ae1a", "target": '/browse/revision/d770e558e21961ad6cfdf0ff7df0' 'eb5d7d4f0754/', "synthetic": False, "target_type": "revision", "author": "Linus Torvalds ", "message": "Linux 4.2-rc1\n", "name": "v4.2-rc1" } # when rv = self.client.get('/browse/release/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('release.html') self.assertEqual(self.get_context_variable('sha1_git'), '426') self.assertEqual(self.get_context_variable('release'), expected_release) self.assertEqual(self.get_context_variable('keys'), [ 'id', 'name', 'date', 'message', 'author', 'target', 'target_type']) mock_service.lookup_release.assert_called_once_with('426') @patch('swh.web.ui.views.service') @istest def browse_revision_not_found(self, mock_service): # given mock_service.lookup_revision.return_value = None # when rv = self.client.get('/browse/revision/1/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git'), '1') self.assertEqual( self.get_context_variable('message'), 'Revision 1 not found!') mock_service.lookup_revision.assert_called_once_with('1') @patch('swh.web.ui.views.service') @istest def browse_revision_bad_input(self, mock_service): # given mock_service.lookup_revision.side_effect = BadInputExc('wrong input') # when rv = self.client.get('/browse/revision/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git'), '426') mock_service.lookup_revision.assert_called_once_with('426') @patch('swh.web.ui.views.service') @istest def browse_revision_found(self, mock_service): # given mock_revision = { 'id': 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754', 'date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'committer': { 'email': 'torvalds@linux-foundation.org', 'name': 'Linus Torvalds' }, 'committer_date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'metadata': None, 'type': 'git', 'author': { 'email': 'torvalds@linux-foundation.org', 'name': 'Linus Torvalds' }, 'message': 'Linux 4.2-rc1\n', 'synthetic': False, 'directory': '2a1dbabeed4dcf1f4a4c441993b2ffc9d972780b', 'parents': [ 'a585d2b738bfa26326b3f1f40f0f1eda0c067ccf' ], } mock_service.lookup_revision.return_value = mock_revision expected_revision = { 'id': 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754', 'date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'committer': 'Linus Torvalds ', 'committer_date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'type': 'git', 'author': 'Linus Torvalds ', 'message': 'Linux 4.2-rc1\n', 'synthetic': False, 'metadata': None, 'parents': [ '/browse/revision/a585d2b738bfa26326b3f1f40f0f1eda0c067ccf/' ], 'directory': '/browse/directory/2a1dbabeed4dcf1f4a4c441993b2f' 'fc9d972780b/', } # when rv = self.client.get('/browse/revision/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git'), '426') self.assertEqual(self.get_context_variable('revision'), expected_revision) mock_service.lookup_revision.assert_called_once_with('426') @istest def browse_revision_history_same_sha1(self): # when rv = self.client.get('/browse/revision/10/history/10/') # then self.assertEquals(rv.status_code, 302) @patch('swh.web.ui.views.service') @istest def browse_revision_history_not_found(self, mock_service): # given mock_service.lookup_revision_with_context.return_value = None # when rv = self.client.get('/browse/revision/1/history/2/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git_root'), '1') self.assertEqual(self.get_context_variable('sha1_git'), '2') self.assertEqual( self.get_context_variable('message'), "Possibly sha1_git '2' is not an ancestor of sha1_git_root '1'") mock_service.lookup_revision_with_context.assert_called_once_with( '1', '2', 100) @patch('swh.web.ui.views.service') @istest def browse_revision_history_root_not_found(self, mock_service): # given mock_service.lookup_revision_with_context.side_effect = NotFoundExc( 'Revision root 123 not found') # when rv = self.client.get('/browse/revision/123/history/456/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git_root'), '123') self.assertEqual(self.get_context_variable('sha1_git'), '456') self.assertEqual( self.get_context_variable('message'), "Revision root 123 not found") mock_service.lookup_revision_with_context.assert_called_once_with( '123', '456', 100) @patch('swh.web.ui.views.service') @istest def browse_revision_history_root_bad_input(self, mock_service): # given mock_service.lookup_revision_with_context.side_effect = NotFoundExc( 'Input incorrect') # when rv = self.client.get('/browse/revision/321/history/654/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git_root'), '321') self.assertEqual(self.get_context_variable('sha1_git'), '654') self.assertEqual( self.get_context_variable('message'), "Input incorrect") mock_service.lookup_revision_with_context.assert_called_once_with( '321', '654', 100) @patch('swh.web.ui.views.utils') @patch('swh.web.ui.views.service') @istest def browse_revision_history(self, mock_service, mock_utils): # given stub_revision = {'id': 'some-rev'} mock_service.lookup_revision_with_context.return_value = stub_revision expected_revision = {'id': 'some-rev-id'} mock_utils.prepare_revision_view.return_value = expected_revision # when rv = self.client.get('/browse/revision/426/history/789/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git_root'), '426') self.assertEqual(self.get_context_variable('sha1_git'), 'some-rev-id') self.assertEqual(self.get_context_variable('revision'), expected_revision) mock_service.lookup_revision_with_context.assert_called_once_with( '426', '789', 100) mock_utils.prepare_revision_view.assert_called_once_with(stub_revision) - @patch('swh.web.ui.views.service') + @patch('swh.web.ui.views.api') @istest - def browse_revision_directory_not_found(self, mock_service): + def browse_revision_directory_KO_not_found(self, mock_api): # given - mock_service.lookup_directory_through_revision.side_effect = NotFoundExc( # noqa + mock_api.api_directory_with_revision.side_effect = NotFoundExc( 'Not found!') # when rv = self.client.get('/browse/revision/1/directory/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertEqual(self.get_context_variable('sha1_git'), '1') self.assertEqual(self.get_context_variable('path'), '.') self.assertIsNone(self.get_context_variable('result')) self.assertEqual( self.get_context_variable('message'), "Not found!") - mock_service.lookup_directory_through_revision.assert_called_once_with( - {'sha1_git': '1'}, None) + mock_api.api_directory_with_revision.assert_called_once_with( + '1', None) - @patch('swh.web.ui.views.service') + @patch('swh.web.ui.views.api') @istest - def browse_revision_directory_bad_input(self, mock_service): + def browse_revision_directory_KO_bad_input(self, mock_api): # given - mock_service.lookup_directory_through_revision.side_effect = BadInputExc( # noqa + mock_api.api_directory_with_revision.side_effect = BadInputExc( # noqa 'Bad input!') # when rv = self.client.get('/browse/revision/10/directory/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertEqual(self.get_context_variable('sha1_git'), '10') self.assertEqual(self.get_context_variable('path'), '.') self.assertIsNone(self.get_context_variable('result')) self.assertEqual( self.get_context_variable('message'), "Bad input!") - mock_service.lookup_directory_through_revision.assert_called_once_with( - {'sha1_git': '10'}, None) - - @patch('swh.web.ui.views.service') - @istest - def browse_revision_directory_not_implemented(self, mock_service): - # given - mock_service.lookup_directory_through_revision.side_effect = NotImplementedError( # noqa - 'Oops! Not implemented!') - - # when - rv = self.client.get('/browse/revision/10/directory/path/') + mock_api.api_directory_with_revision.assert_called_once_with( + '10', None) - # then - self.assertEquals(rv.status_code, 200) - self.assert_template_used('revision-directory.html') - self.assertEqual(self.get_context_variable('sha1_git'), '10') - self.assertEqual(self.get_context_variable('path'), 'path') - self.assertIsNone(self.get_context_variable('result')) - self.assertEqual( - self.get_context_variable('message'), - 'Oops! Not implemented!') - - mock_service.lookup_directory_through_revision.assert_called_once_with( - {'sha1_git': '10'}, 'path') - - @patch('swh.web.ui.views.service') + @patch('swh.web.ui.views.api') @istest - def browse_revision_directory(self, mock_service): + def browse_revision_directory(self, mock_api): # given - stub_result0 = {'type': 'dir', - 'content': [{'id': 'some-result', - 'type': 'file', - 'name': 'blah'}]} + stub_result0 = { + 'type': 'dir', + 'revision': '100', + 'content': [ + { + 'id': 'some-result', + 'type': 'file', + 'name': 'blah', + }, + { + 'id': 'some-other-result', + 'type': 'dir', + 'name': 'foo', + } + ] + } - mock_service.lookup_directory_through_revision.return_value = ( - '100', stub_result0) + mock_api.api_directory_with_revision.return_value = stub_result0 stub_result1 = { 'type': 'dir', - 'content': [ - {'type': 'file', - 'name': 'blah', - 'link': '/browse/revision/100/directory/some/path/blah/'} + 'revision': '100', + 'content': + [ + { + 'id': 'some-result', + 'type': 'file', + 'name': 'blah', + }, + { + 'id': 'some-other-result', + 'type': 'dir', + 'name': 'foo', + } ] } # when rv = self.client.get('/browse/revision/100/directory/some/path/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertEqual(self.get_context_variable('sha1_git'), '100') + self.assertEqual(self.get_context_variable('revision'), '100') self.assertEqual(self.get_context_variable('path'), 'some/path') self.assertIsNone(self.get_context_variable('message')) self.assertEqual(self.get_context_variable('result'), stub_result1) - mock_service.lookup_directory_through_revision.assert_called_once_with( - {'sha1_git': '100'}, 'some/path') + mock_api.api_directory_with_revision.assert_called_once_with( + '100', 'some/path') - @patch('swh.web.ui.views.service') - @istest - def browse_revision_history_directory_redirect(self, mock_service): - # when - rv = self.client.get('/browse/revision/1/history/1/directory/path/to/') - - # then - self.assertEquals(rv.status_code, 301) - - @patch('swh.web.ui.views.service') + @patch('swh.web.ui.views.api') @istest - def browse_revision_history_directory_not_found(self, mock_service): + def browse_revision_history_directory_KO_not_found(self, mock_api): # given - mock_service.lookup_directory_through_revision.side_effect = NotFoundExc('not found') # noqa + mock_api.api_directory_revision_history.side_effect = NotFoundExc( + 'not found') # when - rv = self.client.get('/browse/revision/123/history/456/directory/' - '?limit=666') + rv = self.client.get('/browse/revision/123/history/456/directory/a/b/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertEqual(self.get_context_variable('sha1_git_root'), '123') self.assertEqual(self.get_context_variable('sha1_git'), '456') - self.assertEqual(self.get_context_variable('path'), '.') + self.assertEqual(self.get_context_variable('path'), 'a/b') self.assertEqual(self.get_context_variable('message'), 'not found') self.assertIsNone(self.get_context_variable('result')) - mock_service.lookup_directory_through_revision.assert_called_once_with( - {'sha1_git': '456', 'sha1_git_root': '123'}, None, 666) + mock_api.api_directory_revision_history.assert_called_once_with( + '123', '456', 'a/b') - @patch('swh.web.ui.views.service') + @patch('swh.web.ui.views.api') @istest - def browse_revision_history_directory_bad_input(self, mock_service): + def browse_revision_history_directory_KO_bad_input(self, mock_api): # given - mock_service.lookup_directory_through_revision.side_effect = BadInputExc('bad input') # noqa + mock_api.api_directory_revision_history.side_effect = BadInputExc( + 'bad input') # when - rv = self.client.get('/browse/revision/123/history/456/directory/') + rv = self.client.get('/browse/revision/123/history/456/directory/a/c/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertEqual(self.get_context_variable('sha1_git_root'), '123') self.assertEqual(self.get_context_variable('sha1_git'), '456') - self.assertEqual(self.get_context_variable('path'), '.') + self.assertEqual(self.get_context_variable('path'), 'a/c') self.assertEqual(self.get_context_variable('message'), 'bad input') self.assertIsNone(self.get_context_variable('result')) - mock_service.lookup_directory_through_revision.assert_called_once_with( - {'sha1_git': '456', 'sha1_git_root': '123'}, None, 100) + mock_api.api_directory_revision_history.assert_called_once_with( + '123', '456', 'a/c') @patch('swh.web.ui.views.service') @istest - def browse_revision_history_directory_not_implemented(self, mock_service): - # given - mock_service.lookup_directory_through_revision.side_effect = NotImplementedError('not yet') # noqa - + def browse_revision_history_directory_OK_no_trailing_slash_so_redirect( + self, mock_service): # when - rv = self.client.get('/browse/revision/123/history/456/directory/' - 'path/to/?limit=9000') + rv = self.client.get('/browse/revision/1/history/2/directory/path/to') # then - self.assertEquals(rv.status_code, 200) - self.assert_template_used('revision-directory.html') - self.assertEqual(self.get_context_variable('sha1_git_root'), '123') - self.assertEqual(self.get_context_variable('sha1_git'), '456') - self.assertEqual(self.get_context_variable('path'), 'path/to') - self.assertEqual(self.get_context_variable('message'), 'not yet') - self.assertIsNone(self.get_context_variable('result')) - - mock_service.lookup_directory_through_revision.assert_called_once_with( - {'sha1_git': '456', 'sha1_git_root': '123'}, 'path/to', 9000) + self.assertEquals(rv.status_code, 301) @patch('swh.web.ui.views.service') @istest - def browse_revision_history_directory(self, mock_service): + def browse_revision_history_directory_OK_same_sha1_redirects( + self, mock_service): + # when + rv = self.client.get('/browse/revision/1/history/1/directory/path/to') + + # then + self.assertEquals(rv.status_code, 301) + + @patch('swh.web.ui.views.api') + @istest + def browse_revision_history_directory(self, mock_api): # given - stub_result0 = {'type': 'dir', - 'content': [{'id': 'some-result', - 'type': 'file', - 'name': 'blah'}]} + stub_result0 = { + 'type': 'dir', + 'revision': '1000', + 'content': [{ + 'id': 'some-result', + 'type': 'file', + 'name': 'blah' + }] + } - mock_service.lookup_directory_through_revision.return_value = ( - '1000', - stub_result0) + mock_api.api_directory_revision_history.return_value = stub_result0 stub_result1 = { 'type': 'dir', - 'content': [ - {'type': 'file', - 'name': 'blah', - 'link': '/browse/revision/100/history/999/directory/' - 'path/to/blah/'} - ] + 'revision': '1000', + 'content': [{ + 'id': 'some-result', + 'type': 'file', + 'name': 'blah' + }] } # when rv = self.client.get('/browse/revision/100/history/999/directory/' 'path/to/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertEqual(self.get_context_variable('sha1_git_root'), '100') - self.assertEqual(self.get_context_variable('sha1_git'), '1000') + self.assertEqual(self.get_context_variable('sha1_git'), '999') + self.assertEqual(self.get_context_variable('revision'), '1000') self.assertEqual(self.get_context_variable('path'), 'path/to') self.assertIsNone(self.get_context_variable('message')) self.assertEqual(self.get_context_variable('result'), stub_result1) - mock_service.lookup_directory_through_revision.assert_called_once_with( - {'sha1_git': '999', 'sha1_git_root': '100'}, 'path/to', 100) + mock_api.api_directory_revision_history.assert_called_once_with( + '100', '999', 'path/to') @patch('swh.web.ui.views.service') @istest def browse_entity_not_found(self, mock_service): # given mock_service.lookup_entity_by_uuid.return_value = [] # when rv = self.client.get('/browse/entity/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('entity.html') self.assertEqual(self.get_context_variable('entities'), []) self.assertEqual( self.get_context_variable('message'), "Entity '5f4d4c51-498a-4e28-88b3-b3e4e8396cba' not found!") mock_service.lookup_entity_by_uuid.assert_called_once_with( '5f4d4c51-498a-4e28-88b3-b3e4e8396cba') @patch('swh.web.ui.views.service') @istest def browse_entity_bad_input(self, mock_service): # given mock_service.lookup_entity_by_uuid.side_effect = BadInputExc( 'wrong input') # when rv = self.client.get('/browse/entity/blah-blah-uuid/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('entity.html') self.assertEqual(self.get_context_variable('entities'), []) mock_service.lookup_entity_by_uuid.assert_called_once_with( 'blah-blah-uuid') @patch('swh.web.ui.views.service') @istest def browse_entity(self, mock_service): # given stub_entities = [ {'id': '5f4d4c51-5a9b-4e28-88b3-b3e4e8396cba'}] mock_service.lookup_entity_by_uuid.return_value = stub_entities # when rv = self.client.get('/browse/entity/' '5f4d4c51-5a9b-4e28-88b3-b3e4e8396cba/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('entity.html') self.assertEqual(self.get_context_variable('entities'), stub_entities) self.assertIsNone(self.get_context_variable('message')) mock_service.lookup_entity_by_uuid.assert_called_once_with( '5f4d4c51-5a9b-4e28-88b3-b3e4e8396cba') @patch('swh.web.ui.views.api') @istest def browse_revision_history_through_origin_KO_bad_input(self, mock_api): # given mock_api.api_history_through_revision_with_origin.side_effect = BadInputExc('Problem input.') # noqa # when rv = self.client.get('/browse/revision/origin/99' '/history/123/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertIsNone(self.get_context_variable('revision')) self.assertEqual(self.get_context_variable('message'), 'Problem input.') mock_api.api_history_through_revision_with_origin.assert_called_once_with( # noqa 99, 'refs/heads/master', None, '123') @patch('swh.web.ui.views.api') @istest def browse_revision_history_through_origin_KO_not_found(self, mock_api): # given mock_api.api_history_through_revision_with_origin.side_effect = NotFoundExc('Not found.') # noqa # when rv = self.client.get('/browse/revision/origin/999/' 'branch/dev/history/123/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertIsNone(self.get_context_variable('revision')) self.assertEqual(self.get_context_variable('message'), 'Not found.') mock_api.api_history_through_revision_with_origin.assert_called_once_with( # noqa 999, 'dev', None, '123') @patch('swh.web.ui.views.api') @istest def browse_revision_history_through_origin_KO_other_error(self, mock_api): # given mock_api.api_history_through_revision_with_origin.side_effect = ValueError('Other Error.') # noqa # when rv = self.client.get('/browse/revision/origin/438' '/branch/scratch' '/ts/2016' '/history/789/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertIsNone(self.get_context_variable('revision')) self.assertEqual(self.get_context_variable('message'), 'Other Error.') mock_api.api_history_through_revision_with_origin.assert_called_once_with( # noqa 438, 'scratch', '2016', '789') @patch('swh.web.ui.views.api') @istest def browse_revision_history_through_origin(self, mock_api): # given stub_rev = {'id': 'some-id'} mock_api.api_history_through_revision_with_origin.return_value = stub_rev # noqa # when rv = self.client.get('/browse/revision/origin/99/history/123/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('revision'), stub_rev) self.assertIsNone(self.get_context_variable('message')) mock_api.api_history_through_revision_with_origin.assert_called_once_with( # noqa 99, 'refs/heads/master', None, '123') @patch('swh.web.ui.views.api') @istest def browse_revision_with_origin_KO_not_found(self, mock_api): # given mock_api.api_revision_with_origin.side_effect = NotFoundExc( 'Not found') # when rv = self.client.get('/browse/revision/origin/1/') self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertIsNone(self.get_context_variable('revision')) self.assertEqual(self.get_context_variable('message'), 'Not found') mock_api.api_revision_with_origin.assert_called_once_with( 1, 'refs/heads/master', None) @patch('swh.web.ui.views.api') @istest def browse_revision_with_origin_KO_bad_input(self, mock_api): # given mock_api.api_revision_with_origin.side_effect = BadInputExc( 'Bad Input') # when rv = self.client.get('/browse/revision/origin/1000/branch/dev/') self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertIsNone(self.get_context_variable('revision')) self.assertEqual(self.get_context_variable('message'), 'Bad Input') mock_api.api_revision_with_origin.assert_called_once_with( 1000, 'dev', None) @patch('swh.web.ui.views.api') @istest def browse_revision_with_origin_KO_other(self, mock_api): # given mock_api.api_revision_with_origin.side_effect = ValueError( 'Other') # when rv = self.client.get('/browse/revision/origin/1999' '/branch/scratch/master' '/ts/1990-01-10/') self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertIsNone(self.get_context_variable('revision')) self.assertEqual(self.get_context_variable('message'), 'Other') mock_api.api_revision_with_origin.assert_called_once_with( 1999, 'scratch/master', '1990-01-10') @patch('swh.web.ui.views.api') @istest def browse_revision_with_origin(self, mock_api): # given stub_rev = {'id': 'some-id'} mock_api.api_revision_with_origin.return_value = stub_rev # when rv = self.client.get('/browse/revision/origin/1/') self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('revision'), stub_rev) self.assertIsNone(self.get_context_variable('message')) mock_api.api_revision_with_origin.assert_called_once_with( 1, 'refs/heads/master', None) + + @patch('swh.web.ui.views.api') + @istest + def browse_revision_directory_through_origin_KO_not_found(self, mock_api): + # given + mock_api.api_directory_through_revision_with_origin.side_effect = BadInputExc( # noqa + 'this is not the robot you are looking for') + + # when + rv = self.client.get('/browse/revision/origin/2' + '/directory/') + + self.assertEquals(rv.status_code, 200) + self.assert_template_used('revision-directory.html') + self.assertIsNone(self.get_context_variable('result')) + self.assertEqual(self.get_context_variable('message'), + 'this is not the robot you are looking for') + + mock_api.api_directory_through_revision_with_origin.assert_called_once_with( # noqa + 2, 'refs/heads/master', None, None) + + @patch('swh.web.ui.views.api') + @istest + def browse_revision_directory_through_origin_KO_bad_input(self, mock_api): + # given + mock_api.api_directory_through_revision_with_origin.side_effect = BadInputExc( # noqa + 'Bad Robot') + + # when + rv = self.client.get('/browse/revision/origin/2' + '/directory/') + + self.assertEquals(rv.status_code, 200) + self.assert_template_used('revision-directory.html') + self.assertIsNone(self.get_context_variable('result')) + self.assertEqual(self.get_context_variable('message'), 'Bad Robot') + + mock_api.api_directory_through_revision_with_origin.assert_called_once_with( # noqa + 2, 'refs/heads/master', None, None) + + @patch('swh.web.ui.views.api') + @istest + def browse_revision_directory_through_origin_KO_other(self, mock_api): + # given + mock_api.api_directory_through_revision_with_origin.side_effect = ValueError( # noqa + 'Other bad stuff') + + # when + rv = self.client.get('/browse/revision/origin/2' + '/directory/') + + self.assertEquals(rv.status_code, 200) + self.assert_template_used('revision-directory.html') + self.assertIsNone(self.get_context_variable('result')) + self.assertEqual(self.get_context_variable('message'), + 'Other bad stuff') + + mock_api.api_directory_through_revision_with_origin.assert_called_once_with( # noqa + 2, 'refs/heads/master', None, None) + + @patch('swh.web.ui.views.api') + @istest + def browse_revision_directory_through_origin(self, mock_api): + # given + stub_res = {'id': 'some-id', + 'revision': 'some-rev-id', + 'type': 'dir', + 'content': 'some-content'} + mock_api.api_directory_through_revision_with_origin.return_value = stub_res # noqa + + # when + rv = self.client.get('/browse/revision/origin/2' + '/branch/dev' + '/ts/2013-20-20 10:02' + '/directory/some/file/') + + self.assertEquals(rv.status_code, 200) + self.assert_template_used('revision-directory.html') + self.assertEqual(self.get_context_variable('result'), stub_res) + self.assertIsNone(self.get_context_variable('message')) + + mock_api.api_directory_through_revision_with_origin.assert_called_once_with( # noqa + 2, 'dev', '2013-20-20 10:02', 'some/file') diff --git a/swh/web/ui/utils.py b/swh/web/ui/utils.py index 6acba538..adb7301b 100644 --- a/swh/web/ui/utils.py +++ b/swh/web/ui/utils.py @@ -1,218 +1,240 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import flask +import re from dateutil import parser def filter_endpoints(url_map, prefix_url_rule, blacklist=[]): """Filter endpoints by prefix url rule. Args: - url_map: Url Werkzeug.Map of rules - prefix_url_rule: prefix url string - blacklist: blacklist of some url Returns: Dictionary of url_rule with values methods and endpoint. The key is the url, the associated value is a dictionary of 'methods' (possible http methods) and 'endpoint' (python function) """ out = {} for r in url_map: rule = r['rule'] if rule == prefix_url_rule or rule in blacklist: continue if rule.startswith(prefix_url_rule): out[rule] = {'methods': sorted(map(str, r['methods'])), 'endpoint': r['endpoint']} return out def prepare_directory_listing(files): """Given a list of dictionary files, return a dictionary ready for view. Args: files: List of files to enrich Returns: List of enriched files with urls to other resources """ ls = [] for entry in files: new_entry = {'name': entry['name'], 'type': entry['type']} if entry['type'] == 'dir': new_entry['link'] = flask.url_for('browse_directory', sha1_git=entry['target']) else: new_entry['link'] = flask.url_for('browse_content_data', q=entry['sha1']) ls.append(new_entry) return ls def prepare_directory_listing_with_revision(rev_sha1_git, prefix_path, files): """Given a list of dictionary files, return a dictionary ready for view. Args: rev_sha1_git: The revision identifier prefix_path: the path to append (could be None) files: List of files to enrich Returns: List of enriched files with urls to other resources """ ls = [] for entry in files: new_entry = {'name': entry['name'], 'type': entry['type']} new_path = ''.join([ '' if not prefix_path else (prefix_path + '/'), entry['name']]) new_entry['link'] = flask.url_for('browse_revision_directory', sha1_git=rev_sha1_git, path=new_path) ls.append(new_entry) return ls def prepare_directory_listing_with_revision_history(sha1_git_root, sha1_git, prefix_path, files): """Given a list of dictionary files, return a dictionary ready for view. Args: rev_sha1_git: The revision identifier prefix_path: the path to append (could be None) files: List of files to enrich Returns: List of enriched files with urls to other resources """ ls = [] for entry in files: new_entry = {'name': entry['name'], 'type': entry['type']} new_path = ''.join([ '' if not prefix_path else (prefix_path + '/'), entry['name']]) new_entry['link'] = flask.url_for('browse_revision_history_directory', sha1_git_root=sha1_git_root, sha1_git=sha1_git, path=new_path) ls.append(new_entry) return ls def prepare_revision_view(revision): """Given a revision, return a dictionary ready view. """ author = revision.get('author') if author: revision['author'] = person_to_string(author) committer = revision.get('committer') if committer: revision['committer'] = person_to_string(committer) revision['parents'] = list(map(lambda p: flask.url_for('browse_revision', sha1_git=p), revision.get('parents', []))) if 'children' in revision: revision['children'] = list(map(lambda child: flask.url_for( 'browse_revision', sha1_git=child), revision['children'])) directory = revision.get('directory') if directory: revision['directory'] = flask.url_for('browse_directory', sha1_git=revision['directory']) return revision +def fmap(f, data): + """Map f to data. + Keep the initial data structure as original but map function f to each + level. + + Args: + f: function that expects one argument. + data: data to traverse to apply the f function. list, map, dict or bare + value. + + Returns: + The same data-structure with modified values by the f function. + """ + if isinstance(data, (list, map)): + return [fmap(f, x) for x in data] + if isinstance(data, dict): + return {k: fmap(f, v) for (k, v) in data.items()} + return f(data) + + +def prepare_data_for_view(data): + def replace_api_url(s): + if isinstance(s, str): + return re.sub(r'/api/1/', r'/browse/', s) + return s + + return fmap(replace_api_url, data) + + def filter_field_keys(obj, field_keys): """Given an object instance (directory or list), and a csv field keys to filter on. Return the object instance with filtered keys. Note: Returns obj as is if it's an instance of types not in (dictionary, list) Args: - obj: one object (dictionary, list...) to filter. - field_keys: csv or set of keys to filter the object on Returns: obj filtered on field_keys """ if isinstance(obj, dict): filt_dict = {} for key, value in obj.items(): if key in field_keys: filt_dict[key] = value return filt_dict elif isinstance(obj, list): filt_list = [] for e in obj: filt_list.append(filter_field_keys(e, field_keys)) return filt_list return obj -def fmap(f, data): - if isinstance(data, list): - return [f(x) for x in data] - if isinstance(data, dict): - return {k: f(v) for (k, v) in data.items()} - return f(data) - - def person_to_string(person): """Map a person (person, committer, tagger, etc...) to a string. """ return ''.join([person['name'], ' <', person['email'], '>']) def parse_timestamp(timestamp): """Given a time or timestamp (as string), parse the result as datetime. Returns: datetime result of parsing values. Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - Today is January 1, 2047 at 8:21:00AM - 1452591542 """ try: res = parser.parse(timestamp, ignoretz=False, fuzzy=True) except: res = datetime.datetime.fromtimestamp(float(timestamp)) return res diff --git a/swh/web/ui/views.py b/swh/web/ui/views.py index 0277f83e..fc1ee908 100644 --- a/swh/web/ui/views.py +++ b/swh/web/ui/views.py @@ -1,647 +1,680 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import flask from flask import render_template, request, url_for, redirect from flask.ext.api.decorators import set_renderers from flask.ext.api.renderers import HTMLRenderer from swh.core.hashutil import ALGORITHMS from swh.web.ui import service, utils, api from swh.web.ui.exc import BadInputExc, NotFoundExc from swh.web.ui.main import app hash_filter_keys = ALGORITHMS @app.route('/') @set_renderers(HTMLRenderer) def homepage(): """Home page """ flask.flash('This Web app is still work in progress, use at your own risk', 'warning') return render_template('home.html') @app.route('/about/') @set_renderers(HTMLRenderer) def about(): return render_template('about.html') @app.route('/search/', methods=['GET', 'POST']) @set_renderers(HTMLRenderer) def search(): """Search for hashes in swh-storage. One form to submit either: - hash query to look up in swh storage - some file content to upload, compute its hash and look it up in swh storage - both Returns: dict representing data to look for in swh storage. The following keys are returned: - file: File submitted for upload - filename: Filename submitted for upload - q: Query on hash to look for - message: Message detailing if data has been found or not. """ env = {'filename': None, 'q': None, 'file': None} data = None q = env['q'] file = env['file'] if request.method == 'GET': data = request.args elif request.method == 'POST': data = request.data # or hash and search a file file = request.files.get('filename') # could either be a query for sha1 hash q = data.get('q') messages = [] if q: env['q'] = q try: r = service.lookup_hash(q) messages.append('Content with hash %s%sfound!' % ( q, ' ' if r.get('found') else ' not ')) except BadInputExc as e: messages.append(str(e)) if file and file.filename: env['file'] = file try: uploaded_content = service.upload_and_search(file) filename = uploaded_content['filename'] sha1 = uploaded_content['sha1'] found = uploaded_content['found'] messages.append('File %s with hash %s%sfound!' % ( filename, sha1, ' ' if found else ' not ')) env.update({ 'filename': filename, 'sha1': sha1, }) except BadInputExc as e: messages.append(str(e)) env['q'] = q if q else '' env['messages'] = messages return render_template('upload_and_search.html', **env) def _origin_seen(q, data): """Given an origin, compute a message string with the right information. Args: origin: a dictionary with keys: - origin: a dictionary with type and url keys - occurrence: a dictionary with a validity range Returns: Message as a string """ origin_type = data['origin_type'] origin_url = data['origin_url'] revision = data['revision'] branch = data['branch'] path = data['path'] return """The content with hash %s has been seen on origin with type '%s' at url '%s'. The revision was identified at '%s' on branch '%s'. The file's path referenced was '%s'.""" % (q, origin_type, origin_url, revision, branch, path) @app.route('/browse/content/') @app.route('/browse/content//') @set_renderers(HTMLRenderer) def browse_content_detail(q='5d448a06f02d9de748b6b0b9620cba1bed8480da'): """Given a hash and a checksum, display the content's meta-data. Args: q is of the form algo_hash:hash with algo_hash in (sha1, sha1_git, sha256) Returns: Information on one possible origin for such content. Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if the content is not found. """ env = {} message = None content = None try: content = service.lookup_content(q) if not content: message = 'Content with %s not found.' % q except BadInputExc as e: message = str(e) env['message'] = message env['content'] = content return render_template('content.html', **env) @app.route('/browse/content//raw/') @set_renderers(HTMLRenderer) def browse_content_data(q): """Given a hash and a checksum, display the content's raw data. Args: q is of the form algo_hash:hash with algo_hash in (sha1, sha1_git, sha256) Returns: Information on one possible origin for such content. Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if the content is not found. """ env = {} content = None try: content = service.lookup_content_raw(q) if content: # FIXME: will break if not utf-8 content['data'] = content['data'].decode('utf-8') message = 'Content %s' % content['sha1'] else: message = 'Content with %s not found.' % q except BadInputExc as e: message = str(e) env['message'] = message env['content'] = content return render_template('content-data.html', **env) # @app.route('/browse/content//origin/') @set_renderers(HTMLRenderer) def browse_content_with_origin( q='sha1:4320781056e5a735a39de0b8c229aea224590052'): """Show content information. Args: - q: query string of the form with `algo_hash` in sha1, sha1_git, sha256. This means that several different URLs (at least one per HASH_ALGO) will point to the same content sha: the sha with 'hash' format Returns: The content's information at for a given checksum. """ env = {'q': q} try: content = service.lookup_hash(q) if not content.get('found'): message = "Hash %s was not found." % q else: origin = service.lookup_hash_origin(q) message = _origin_seen(q, origin) except BadInputExc as e: # do not like it but do not duplicate code message = str(e) env['message'] = message return render_template('content-with-origin.html', **env) @app.route('/browse/directory/') @app.route('/browse/directory//') @set_renderers(HTMLRenderer) def browse_directory(sha1_git='dcf3289b576b1c8697f2a2d46909d36104208ba3'): """Show directory information. Args: - sha1_git: the directory's sha1 git identifier. Returns: The content's information at sha1_git """ env = {'sha1_git': sha1_git} files = [] try: directory_files = service.lookup_directory(sha1_git) if directory_files: message = "Listing for directory %s:" % sha1_git files = utils.prepare_directory_listing(directory_files) else: message = "Directory %s not found." % sha1_git except BadInputExc as e: # do not like it but do not duplicate code message = str(e) env['message'] = message env['files'] = files return render_template('directory.html', **env) @app.route('/browse/origin/') @app.route('/browse/origin//') @set_renderers(HTMLRenderer) def browse_origin(origin_id=1): """Browse origin with id id. """ env = {'origin_id': origin_id, 'origin': None} try: ori = service.lookup_origin(origin_id) if ori: env.update({'origin': ori}) else: env.update({'message': 'Origin %s not found!' % origin_id}) except BadInputExc as e: env.update({'message': str(e)}) return render_template('origin.html', **env) @app.route('/browse/person/') @app.route('/browse/person//') @set_renderers(HTMLRenderer) def browse_person(person_id=1): """Browse person with id id. """ env = {'person_id': person_id, 'person': None} try: ori = service.lookup_person(person_id) if ori: env.update({'person': ori}) else: env.update({'message': 'Person %s not found!' % person_id}) except BadInputExc as e: env.update({'message': str(e)}) return render_template('person.html', **env) @app.route('/browse/release/') @app.route('/browse/release//') @set_renderers(HTMLRenderer) def browse_release(sha1_git='1e951912027ea6873da6985b91e50c47f645ae1a'): """Browse release with sha1_git. """ env = {'sha1_git': sha1_git, 'release': None} try: rel = service.lookup_release(sha1_git) if rel: author = rel.get('author') if author: rel['author'] = utils.person_to_string(author) target_type = rel.get('target_type') if target_type == 'revision': rel['target'] = url_for('browse_revision', sha1_git=rel['target']) env.update({'release': rel, 'keys': ['id', 'name', 'date', 'message', 'author', 'target', 'target_type']}) else: env.update({'message': 'Release %s not found!' % sha1_git}) except BadInputExc as e: env.update({'message': str(e)}) return render_template('release.html', **env) @app.route('/browse/revision/') @app.route('/browse/revision//') @set_renderers(HTMLRenderer) def browse_revision(sha1_git='d770e558e21961ad6cfdf0ff7df0eb5d7d4f0754'): """Browse revision with sha1_git. """ env = {'sha1_git': sha1_git, 'keys': [], 'revision': None} try: rev = service.lookup_revision(sha1_git) if rev: env['revision'] = utils.prepare_revision_view(rev) else: env['message'] = 'Revision %s not found!' % sha1_git except BadInputExc as e: env.update({'message': str(e)}) return render_template('revision.html', **env) @app.route('/browse/revision//history//') @set_renderers(HTMLRenderer) def browse_revision_history(sha1_git_root, sha1_git): """Display information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. In other words, sha1_git is an ancestor of sha1_git_root. Args: sha1_git_root: latest revision of the browsed history. sha1_git: one of sha1_git_root's ancestors. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of sha1_git_root (even if it is). Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root. """ limit = int(request.args.get('limit', '100')) env = {'sha1_git_root': sha1_git_root, 'sha1_git': sha1_git, 'message': None, 'keys': [], 'revision': None} if sha1_git == sha1_git_root: return redirect(url_for('browse_revision', sha1_git=sha1_git, limit=limit)) try: revision = service.lookup_revision_with_context(sha1_git_root, sha1_git, limit) if revision: revision = utils.prepare_revision_view(revision) env.update({ 'sha1_git': revision['id'], 'revision': revision, }) else: env['message'] = "Possibly sha1_git '%s' is not an ancestor " \ "of sha1_git_root '%s'" % (sha1_git, sha1_git_root) except (BadInputExc, NotFoundExc) as e: env['message'] = str(e) return render_template('revision.html', **env) @app.route('/browse/revision//directory/') @app.route('/browse/revision//directory//') @set_renderers(HTMLRenderer) def browse_revision_directory(sha1_git, path=None): """Browse directory from revision with sha1_git. """ env = { 'sha1_git': sha1_git, 'path': '.' if not path else path, 'message': None, 'result': None } try: - rev_id, result = service.lookup_directory_through_revision({ - 'sha1_git': sha1_git - }, path) - - if result['type'] == 'dir': # dir_entries - result['content'] = utils.prepare_directory_listing_with_revision( - sha1_git, - path, - result['content']) - - env['sha1_git'] = rev_id + result = api.api_directory_with_revision(sha1_git, path) + result['content'] = utils.prepare_data_for_view(result['content']) + env['revision'] = result['revision'] env['result'] = result - except (BadInputExc, NotFoundExc, NotImplementedError) as e: + except (BadInputExc, NotFoundExc) as e: env['message'] = str(e) return render_template('revision-directory.html', **env) @app.route('/browse/revision/' '/history/' '/directory/') @app.route('/browse/revision/' '/history/' '/directory//') @set_renderers(HTMLRenderer) def browse_revision_history_directory(sha1_git_root, sha1_git, path=None): """Return information about directory pointed to by the revision defined as: revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. Args: sha1_git_root: latest revision of the browsed history. sha1_git: one of sha1_git_root's ancestors. path: optional directory pointed to by that revision. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of sha1_git_root (even if it is). Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root or the path referenced does not exist """ - limit = int(request.args.get('limit', '100')) - env = { 'sha1_git_root': sha1_git_root, 'sha1_git': sha1_git, 'path': '.' if not path else path, 'message': None, 'result': None } if sha1_git == sha1_git_root: return redirect(url_for('browse_revision_directory', sha1_git=sha1_git, path=path), code=301) try: - rev_id, result = service.lookup_directory_through_revision({ - 'sha1_git_root': sha1_git_root, - 'sha1_git': sha1_git - }, path, limit) - - if result['type'] == 'dir': # dir_entries - result['content'] = utils.prepare_directory_listing_with_revision_history( # noqa - sha1_git_root, - sha1_git, - path, - result['content']) - - env['sha1_git'] = rev_id + result = api.api_directory_revision_history(sha1_git_root, + sha1_git, + path) + env['revision'] = result['revision'] + env['content'] = utils.prepare_data_for_view(result['content']) env['result'] = result - except (BadInputExc, NotFoundExc, NotImplementedError) as e: + except (BadInputExc, NotFoundExc) as e: env['message'] = str(e) return render_template('revision-directory.html', **env) @app.route('/browse/revision' '/origin/') @app.route('/browse/revision' '/origin//') @app.route('/browse/revision' '/origin/' '/branch//') @app.route('/browse/revision' '/origin/' '/branch/' '/ts//') @app.route('/browse/revision' '/origin/' '/ts//') @set_renderers(HTMLRenderer) def browse_revision_with_origin(origin_id=1, branch_name="refs/heads/master", ts=None): """Instead of having to specify a (root) revision by SHA1_GIT, users might want to specify a place and a time. In SWH a "place" is an origin; a "time" is a timestamp at which some place has been observed by SWH crawlers. Args: origin_id: origin's identifier (default to 1). branch_name: the optional branch for the given origin (default to master). timestamp: optional timestamp (default to the nearest time crawl of timestamp). Returns: Information on the revision if found. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the revision is not found. """ env = {'message': None, 'revision': None} try: env['revision'] = api.api_revision_with_origin(origin_id, branch_name, ts) except (ValueError, NotFoundExc, BadInputExc) as e: env['message'] = str(e) return render_template('revision.html', **env) @app.route('/browse/revision' '/origin/' '/history//') @app.route('/browse/revision' '/origin/' '/branch/' '/history//') @app.route('/browse/revision' '/origin/' '/branch/' '/ts/' '/history//') @set_renderers(HTMLRenderer) def browse_revision_history_through_origin(origin_id, branch_name='refs/heads/master', ts=None, sha1_git=None): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of the revision root identified by (origin_id, branch_name, ts). Given sha1_git_root such root revision's identifier, in other words, sha1_git is an ancestor of sha1_git_root. Args: origin_id: origin's identifier (default to 1). branch_name: the optional branch for the given origin (default to master). timestamp: optional timestamp (default to the nearest time crawl of timestamp). sha1_git: one of sha1_git_root's ancestors. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of sha1_git_root (even if it is). Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root. """ env = {'message': None, 'revision': None} try: env['revision'] = api.api_history_through_revision_with_origin( origin_id, branch_name, ts, sha1_git) except (ValueError, BadInputExc, NotFoundExc) as e: env['message'] = str(e) return render_template('revision.html', **env) +@app.route('/browse/revision' + '/origin/' + '/directory/') +@app.route('/browse/revision' + '/origin/' + '/directory/') +@app.route('/browse/revision' + '/origin/' + '/branch/' + '/directory/') +@app.route('/browse/revision' + '/origin/' + '/branch/' + '/directory//') +@app.route('/browse/revision' + '/origin/' + '/branch/' + '/ts/' + '/directory/') +@app.route('/browse/revision' + '/origin/' + '/branch/' + '/ts/' + '/directory//') +@set_renderers(HTMLRenderer) +def browse_revision_directory_through_origin(origin_id, + branch_name='refs/heads/master', + ts=None, + path=None): + + env = {'message': None, + 'origin_id': origin_id, + 'ts': ts, + 'path': '.' if not path else path, + 'result': None} + try: + result = api.api_directory_through_revision_with_origin( + origin_id, + branch_name, + ts, + path) + + result['content'] = utils.prepare_data_for_view(result['content']) + env['revision'] = result['revision'] + env['result'] = result + except (ValueError, BadInputExc, NotFoundExc) as e: + env['message'] = str(e) + + return render_template('revision-directory.html', **env) + + @app.route('/browse/entity/') @app.route('/browse/entity//') @set_renderers(HTMLRenderer) def browse_entity(uuid='5f4d4c51-498a-4e28-88b3-b3e4e8396cba'): env = {'entities': [], 'message': None} entities = env['entities'] try: entities = service.lookup_entity_by_uuid(uuid) if not entities: env['message'] = "Entity '%s' not found!" % uuid except BadInputExc as e: env.update({'message': str(e)}) env['entities'] = entities return render_template('entity.html', **env)